diff --git a/.buildkite/docker-compose.py35.pg95.yaml b/.buildkite/docker-compose.py35.pg95.yaml
index aaea33006b..43237b7775 100644
--- a/.buildkite/docker-compose.py35.pg95.yaml
+++ b/.buildkite/docker-compose.py35.pg95.yaml
@@ -17,6 +17,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
- working_dir: /app
+ working_dir: /src
volumes:
- - ..:/app
+ - ..:/src
diff --git a/.buildkite/docker-compose.py37.pg11.yaml b/.buildkite/docker-compose.py37.pg11.yaml
index 1b32675e78..b767228147 100644
--- a/.buildkite/docker-compose.py37.pg11.yaml
+++ b/.buildkite/docker-compose.py37.pg11.yaml
@@ -17,6 +17,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
- working_dir: /app
+ working_dir: /src
volumes:
- - ..:/app
+ - ..:/src
diff --git a/.buildkite/docker-compose.py37.pg95.yaml b/.buildkite/docker-compose.py37.pg95.yaml
index 7679f6508d..02fcd28304 100644
--- a/.buildkite/docker-compose.py37.pg95.yaml
+++ b/.buildkite/docker-compose.py37.pg95.yaml
@@ -17,6 +17,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
- working_dir: /app
+ working_dir: /src
volumes:
- - ..:/app
+ - ..:/src
diff --git a/.buildkite/format_tap.py b/.buildkite/format_tap.py
index 94582f5571..b557a9c38e 100644
--- a/.buildkite/format_tap.py
+++ b/.buildkite/format_tap.py
@@ -1,3 +1,18 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
import sys
from tap.parser import Parser
from tap.line import Result, Unknown, Diagnostic
diff --git a/.buildkite/merge_base_branch.sh b/.buildkite/merge_base_branch.sh
index 26176d6465..eb7219a56d 100755
--- a/.buildkite/merge_base_branch.sh
+++ b/.buildkite/merge_base_branch.sh
@@ -27,7 +27,7 @@ git config --global user.name "A robot"
# Fetch and merge. If it doesn't work, it will raise due to set -e.
git fetch -u origin $GITBASE
-git merge --no-edit origin/$GITBASE
+git merge --no-edit --no-commit origin/$GITBASE
# Show what we are after.
git --no-pager show -s
diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml
deleted file mode 100644
index d9327227ed..0000000000
--- a/.buildkite/pipeline.yml
+++ /dev/null
@@ -1,248 +0,0 @@
-env:
- CODECOV_TOKEN: "2dd7eb9b-0eda-45fe-a47c-9b5ac040045f"
-
-steps:
-
- - command:
- - "python -m pip install tox"
- - "tox -e check_codestyle"
- label: "\U0001F9F9 Check Style"
- plugins:
- - docker#v3.0.1:
- image: "python:3.6"
-
- - command:
- - "python -m pip install tox"
- - "tox -e packaging"
- label: "\U0001F9F9 packaging"
- plugins:
- - docker#v3.0.1:
- image: "python:3.6"
-
- - command:
- - "python -m pip install tox"
- - "tox -e check_isort"
- label: "\U0001F9F9 isort"
- plugins:
- - docker#v3.0.1:
- image: "python:3.6"
-
- - command:
- - "python -m pip install tox"
- - "scripts-dev/check-newsfragment"
- label: ":newspaper: Newsfile"
- branches: "!master !develop !release-*"
- plugins:
- - docker#v3.0.1:
- image: "python:3.6"
- propagate-environment: true
-
- - command:
- - "python -m pip install tox"
- - "tox -e check-sampleconfig"
- label: "\U0001F9F9 check-sample-config"
- plugins:
- - docker#v3.0.1:
- image: "python:3.6"
-
- - command:
- - "python -m pip install tox"
- - "tox -e mypy"
- label: ":mypy: mypy"
- plugins:
- - docker#v3.0.1:
- image: "python:3.5"
-
- - wait
-
- - command:
- - "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev"
- - "python3.5 -m pip install tox"
- - "tox -e py35-old,codecov"
- label: ":python: 3.5 / SQLite / Old Deps"
- env:
- TRIAL_FLAGS: "-j 2"
- LANG: "C.UTF-8"
- plugins:
- - docker#v3.0.1:
- image: "ubuntu:xenial" # We use xenail to get an old sqlite and python
- propagate-environment: true
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - command:
- - "python -m pip install tox"
- - "tox -e py35,codecov"
- label: ":python: 3.5 / SQLite"
- env:
- TRIAL_FLAGS: "-j 2"
- plugins:
- - docker#v3.0.1:
- image: "python:3.5"
- propagate-environment: true
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - command:
- - "python -m pip install tox"
- - "tox -e py36,codecov"
- label: ":python: 3.6 / SQLite"
- env:
- TRIAL_FLAGS: "-j 2"
- plugins:
- - docker#v3.0.1:
- image: "python:3.6"
- propagate-environment: true
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - command:
- - "python -m pip install tox"
- - "tox -e py37,codecov"
- label: ":python: 3.7 / SQLite"
- env:
- TRIAL_FLAGS: "-j 2"
- plugins:
- - docker#v3.0.1:
- image: "python:3.7"
- propagate-environment: true
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - label: ":python: 3.5 / :postgres: 9.5"
- agents:
- queue: "medium"
- env:
- TRIAL_FLAGS: "-j 8"
- command:
- - "bash -c 'python -m pip install tox && python -m tox -e py35-postgres,codecov'"
- plugins:
- - docker-compose#v2.1.0:
- run: testenv
- config:
- - .buildkite/docker-compose.py35.pg95.yaml
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - label: ":python: 3.7 / :postgres: 9.5"
- agents:
- queue: "medium"
- env:
- TRIAL_FLAGS: "-j 8"
- command:
- - "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
- plugins:
- - docker-compose#v2.1.0:
- run: testenv
- config:
- - .buildkite/docker-compose.py37.pg95.yaml
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - label: ":python: 3.7 / :postgres: 11"
- agents:
- queue: "medium"
- env:
- TRIAL_FLAGS: "-j 8"
- command:
- - "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
- plugins:
- - docker-compose#v2.1.0:
- run: testenv
- config:
- - .buildkite/docker-compose.py37.pg11.yaml
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
-
- - label: "SyTest - :python: 3.5 / SQLite / Monolith"
- agents:
- queue: "medium"
- command:
- - "bash .buildkite/merge_base_branch.sh"
- - "bash /synapse_sytest.sh"
- plugins:
- - docker#v3.0.1:
- image: "matrixdotorg/sytest-synapse:py35"
- propagate-environment: true
- always-pull: true
- workdir: "/src"
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - label: "SyTest - :python: 3.5 / :postgres: 9.6 / Monolith"
- agents:
- queue: "medium"
- env:
- POSTGRES: "1"
- command:
- - "bash .buildkite/merge_base_branch.sh"
- - "bash /synapse_sytest.sh"
- plugins:
- - docker#v3.0.1:
- image: "matrixdotorg/sytest-synapse:py35"
- propagate-environment: true
- always-pull: true
- workdir: "/src"
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
-
- - label: "SyTest - :python: 3.5 / :postgres: 9.6 / Workers"
- agents:
- queue: "medium"
- env:
- POSTGRES: "1"
- WORKERS: "1"
- BLACKLIST: "synapse-blacklist-with-workers"
- command:
- - "bash .buildkite/merge_base_branch.sh"
- - "bash -c 'cat /src/sytest-blacklist /src/.buildkite/worker-blacklist > /src/synapse-blacklist-with-workers'"
- - "bash /synapse_sytest.sh"
- plugins:
- - docker#v3.0.1:
- image: "matrixdotorg/sytest-synapse:py35"
- propagate-environment: true
- always-pull: true
- workdir: "/src"
- retry:
- automatic:
- - exit_status: -1
- limit: 2
- - exit_status: 2
- limit: 2
diff --git a/.coveragerc b/.coveragerc
index e9460a340a..11f2ec8387 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -1,7 +1,8 @@
[run]
branch = True
parallel = True
-include = synapse/*
+include=$TOP/synapse/*
+data_file = $TOP/.coverage
[report]
precision = 2
diff --git a/INSTALL.md b/INSTALL.md
index 5728882460..6bce370ea8 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -36,7 +36,7 @@ that your email address is probably `user@example.com` rather than
System requirements:
- POSIX-compliant system (tested on Linux & OS X)
-- Python 3.5, 3.6, 3.7, or 2.7
+- Python 3.5, 3.6, or 3.7
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
Synapse is written in Python but some of the libraries it uses are written in
@@ -421,7 +421,7 @@ If Synapse is not configured with an SMTP server, password reset via email will
The easiest way to create a new user is to do so from a client like [Riot](https://riot.im).
-Alternatively you can do so from the command line if you have installed via pip.
+Alternatively you can do so from the command line if you have installed via pip.
This can be done as follows:
diff --git a/changelog.d/5853.feature b/changelog.d/5853.feature
new file mode 100644
index 0000000000..80a04ae2ee
--- /dev/null
+++ b/changelog.d/5853.feature
@@ -0,0 +1 @@
+Opentracing for device list updates.
diff --git a/changelog.d/5892.misc b/changelog.d/5892.misc
new file mode 100644
index 0000000000..939fe8c655
--- /dev/null
+++ b/changelog.d/5892.misc
@@ -0,0 +1 @@
+Compatibility with v2 Identity Service APIs other than /lookup.
\ No newline at end of file
diff --git a/changelog.d/5897.feature b/changelog.d/5897.feature
deleted file mode 100644
index 7b10774c96..0000000000
--- a/changelog.d/5897.feature
+++ /dev/null
@@ -1 +0,0 @@
-Switch to the v2 lookup API for 3PID invites.
\ No newline at end of file
diff --git a/changelog.d/5915.bugfix b/changelog.d/5915.bugfix
new file mode 100644
index 0000000000..bf5b99fedc
--- /dev/null
+++ b/changelog.d/5915.bugfix
@@ -0,0 +1 @@
+Fix 404 for thumbnail download when `dynamic_thumbnails` is `false` and the thumbnail was dynamically generated. Fix reported by rkfg.
diff --git a/changelog.d/5922.misc b/changelog.d/5922.misc
new file mode 100644
index 0000000000..2cc864897e
--- /dev/null
+++ b/changelog.d/5922.misc
@@ -0,0 +1 @@
+Update Buildkite pipeline to use plugins instead of buildkite-agent commands.
diff --git a/changelog.d/5931.misc b/changelog.d/5931.misc
new file mode 100644
index 0000000000..ac8e74f5b9
--- /dev/null
+++ b/changelog.d/5931.misc
@@ -0,0 +1 @@
+Remove unnecessary parentheses in return statements.
\ No newline at end of file
diff --git a/changelog.d/5938.misc b/changelog.d/5938.misc
new file mode 100644
index 0000000000..b5a3b6ee3b
--- /dev/null
+++ b/changelog.d/5938.misc
@@ -0,0 +1 @@
+Remove unused jenkins/prepare_sytest.sh file.
diff --git a/changelog.d/5943.misc b/changelog.d/5943.misc
new file mode 100644
index 0000000000..6545e1244a
--- /dev/null
+++ b/changelog.d/5943.misc
@@ -0,0 +1 @@
+Move Buildkite pipeline config to the pipelines repo.
diff --git a/changelog.d/5953.misc b/changelog.d/5953.misc
new file mode 100644
index 0000000000..38e885f42a
--- /dev/null
+++ b/changelog.d/5953.misc
@@ -0,0 +1 @@
+Update INSTALL.md to say that Python 2 is no longer supported.
diff --git a/changelog.d/5962.misc b/changelog.d/5962.misc
new file mode 100644
index 0000000000..d97d376c36
--- /dev/null
+++ b/changelog.d/5962.misc
@@ -0,0 +1 @@
+Remove unnecessary return statements in the codebase which were the result of a regex run.
\ No newline at end of file
diff --git a/changelog.d/5963.misc b/changelog.d/5963.misc
new file mode 100644
index 0000000000..0d6c3c3d65
--- /dev/null
+++ b/changelog.d/5963.misc
@@ -0,0 +1 @@
+Remove left-over methods from C/S registration API.
\ No newline at end of file
diff --git a/changelog.d/5964.feature b/changelog.d/5964.feature
new file mode 100644
index 0000000000..273c9df026
--- /dev/null
+++ b/changelog.d/5964.feature
@@ -0,0 +1 @@
+Remove `bind_email` and `bind_msisdn` parameters from /register ala MSC2140.
\ No newline at end of file
diff --git a/changelog.d/5966.bugfix b/changelog.d/5966.bugfix
new file mode 100644
index 0000000000..b8ef5a7819
--- /dev/null
+++ b/changelog.d/5966.bugfix
@@ -0,0 +1 @@
+Fix admin API for listing media in a room not being available with an external media repo.
diff --git a/changelog.d/5967.bugfix b/changelog.d/5967.bugfix
new file mode 100644
index 0000000000..8d7bf5c2e9
--- /dev/null
+++ b/changelog.d/5967.bugfix
@@ -0,0 +1 @@
+Fix list media admin API always returning an error.
diff --git a/changelog.d/5970.docker b/changelog.d/5970.docker
new file mode 100644
index 0000000000..c9d04da9cd
--- /dev/null
+++ b/changelog.d/5970.docker
@@ -0,0 +1 @@
+Avoid changing UID/GID if they are already correct.
diff --git a/changelog.d/5971.bugfix b/changelog.d/5971.bugfix
new file mode 100644
index 0000000000..9ea095103b
--- /dev/null
+++ b/changelog.d/5971.bugfix
@@ -0,0 +1 @@
+Fix room and user stats tracking.
diff --git a/changelog.d/5975.misc b/changelog.d/5975.misc
new file mode 100644
index 0000000000..5fcd229b89
--- /dev/null
+++ b/changelog.d/5975.misc
@@ -0,0 +1 @@
+Cleanup event auth type initialisation.
\ No newline at end of file
diff --git a/changelog.d/5980.feature b/changelog.d/5980.feature
new file mode 100644
index 0000000000..f25d8d81d9
--- /dev/null
+++ b/changelog.d/5980.feature
@@ -0,0 +1 @@
+Add POST /_matrix/client/r0/account/3pid/unbind endpoint from MSC2140 for unbinding a 3PID from an identity server without removing it from the homeserver user account.
\ No newline at end of file
diff --git a/changelog.d/5982.bugfix b/changelog.d/5982.bugfix
new file mode 100644
index 0000000000..3ea281a3a0
--- /dev/null
+++ b/changelog.d/5982.bugfix
@@ -0,0 +1 @@
+Include missing opentracing contexts in outbout replication requests.
diff --git a/changelog.d/5983.feature b/changelog.d/5983.feature
new file mode 100644
index 0000000000..aa23ee6dcd
--- /dev/null
+++ b/changelog.d/5983.feature
@@ -0,0 +1 @@
+Add minimum opentracing for client servlets.
diff --git a/changelog.d/5984.bugfix b/changelog.d/5984.bugfix
new file mode 100644
index 0000000000..3387bf82bb
--- /dev/null
+++ b/changelog.d/5984.bugfix
@@ -0,0 +1 @@
+Fix sending of EDUs when opentracing is enabled with an empty whitelist.
diff --git a/contrib/cmdclient/console.py b/contrib/cmdclient/console.py
index af8f39c8c2..899c650b0c 100755
--- a/contrib/cmdclient/console.py
+++ b/contrib/cmdclient/console.py
@@ -268,6 +268,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_emailrequest(self, args):
+ # TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/validate/email/requestToken"
@@ -302,6 +303,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_emailvalidate(self, args):
+ # TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/validate/email/submitToken"
@@ -330,6 +332,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_3pidbind(self, args):
+ # TODO: Update to use v2 Identity Service API endpoint
url = self._identityServerUrl() + "/_matrix/identity/api/v1/3pid/bind"
json_res = yield self.http_client.do_request(
@@ -398,6 +401,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_invite(self, roomid, userstring):
if not userstring.startswith("@") and self._is_on("complete_usernames"):
+ # TODO: Update to use v2 Identity Service API endpoint
url = self._identityServerUrl() + "/_matrix/identity/api/v1/lookup"
json_res = yield self.http_client.do_request(
@@ -407,6 +411,7 @@ class SynapseCmd(cmd.Cmd):
mxid = None
if "mxid" in json_res and "signatures" in json_res:
+ # TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/pubkey/ed25519"
diff --git a/docker/start.py b/docker/start.py
index 40a861f200..260f2d9943 100755
--- a/docker/start.py
+++ b/docker/start.py
@@ -41,8 +41,8 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
config_dir (str): where to put generated config files
config_path (str): where to put the main config file
environ (dict): environment dictionary
- ownership (str): "<user>:<group>" string which will be used to set
- ownership of the generated configs
+ ownership (str|None): "<user>:<group>" string which will be used to set
+ ownership of the generated configs. If None, ownership will not change.
"""
for v in ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"):
if v not in environ:
@@ -105,24 +105,24 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
log("Generating log config file " + log_config_file)
convert("/conf/log.config", log_config_file, environ)
- subprocess.check_output(["chown", "-R", ownership, "/data"])
-
# Hopefully we already have a signing key, but generate one if not.
- subprocess.check_output(
- [
- "su-exec",
- ownership,
- "python",
- "-m",
- "synapse.app.homeserver",
- "--config-path",
- config_path,
- # tell synapse to put generated keys in /data rather than /compiled
- "--keys-directory",
- config_dir,
- "--generate-keys",
- ]
- )
+ args = [
+ "python",
+ "-m",
+ "synapse.app.homeserver",
+ "--config-path",
+ config_path,
+ # tell synapse to put generated keys in /data rather than /compiled
+ "--keys-directory",
+ config_dir,
+ "--generate-keys",
+ ]
+
+ if ownership is not None:
+ subprocess.check_output(["chown", "-R", ownership, "/data"])
+ args = ["su-exec", ownership] + args
+
+ subprocess.check_output(args)
def run_generate_config(environ, ownership):
@@ -130,7 +130,7 @@ def run_generate_config(environ, ownership):
Args:
environ (dict): env var dict
- ownership (str): "userid:groupid" arg for chmod
+ ownership (str|None): "userid:groupid" arg for chmod. If None, ownership will not change.
Never returns.
"""
@@ -149,9 +149,6 @@ def run_generate_config(environ, ownership):
log("Creating log config %s" % (log_config_file,))
convert("/conf/log.config", log_config_file, environ)
- # make sure that synapse has perms to write to the data dir.
- subprocess.check_output(["chown", ownership, data_dir])
-
args = [
"python",
"-m",
@@ -170,12 +167,33 @@ def run_generate_config(environ, ownership):
"--open-private-ports",
]
# log("running %s" % (args, ))
- os.execv("/usr/local/bin/python", args)
+
+ if ownership is not None:
+ args = ["su-exec", ownership] + args
+ os.execv("/sbin/su-exec", args)
+
+ # make sure that synapse has perms to write to the data dir.
+ subprocess.check_output(["chown", ownership, data_dir])
+ else:
+ os.execv("/usr/local/bin/python", args)
def main(args, environ):
mode = args[1] if len(args) > 1 else None
- ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
+ desired_uid = int(environ.get("UID", "991"))
+ desired_gid = int(environ.get("GID", "991"))
+ if (desired_uid == os.getuid()) and (desired_gid == os.getgid()):
+ ownership = None
+ else:
+ ownership = "{}:{}".format(desired_uid, desired_gid)
+
+ log(
+ "Container running as UserID %s:%s, ENV (or defaults) requests %s:%s"
+ % (os.getuid(), os.getgid(), desired_uid, desired_gid)
+ )
+
+ if ownership is None:
+ log("Will not perform chmod/su-exec as UserID already matches request")
# In generate mode, generate a configuration and missing keys, then exit
if mode == "generate":
@@ -227,16 +245,12 @@ def main(args, environ):
log("Starting synapse with config file " + config_path)
- args = [
- "su-exec",
- ownership,
- "python",
- "-m",
- "synapse.app.homeserver",
- "--config-path",
- config_path,
- ]
- os.execv("/sbin/su-exec", args)
+ args = ["python", "-m", "synapse.app.homeserver", "--config-path", config_path]
+ if ownership is not None:
+ args = ["su-exec", ownership] + args
+ os.execv("/sbin/su-exec", args)
+ else:
+ os.execv("/usr/local/bin/python", args)
if __name__ == "__main__":
diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md
new file mode 100644
index 0000000000..e1facb38d4
--- /dev/null
+++ b/docs/room_and_user_statistics.md
@@ -0,0 +1,62 @@
+Room and User Statistics
+========================
+
+Synapse maintains room and user statistics (as well as a cache of room state),
+in various tables. These can be used for administrative purposes but are also
+used when generating the public room directory.
+
+
+# Synapse Developer Documentation
+
+## High-Level Concepts
+
+### Definitions
+
+* **subject**: Something we are tracking stats about ā currently a room or user.
+* **current row**: An entry for a subject in the appropriate current statistics
+ table. Each subject can have only one.
+* **historical row**: An entry for a subject in the appropriate historical
+ statistics table. Each subject can have any number of these.
+
+### Overview
+
+Stats are maintained as time series. There are two kinds of column:
+
+* absolute columns ā where the value is correct for the time given by `end_ts`
+ in the stats row. (Imagine a line graph for these values)
+ * They can also be thought of as 'gauges' in Prometheus, if you are familiar.
+* per-slice columns ā where the value corresponds to how many of the occurrences
+ occurred within the time slice given by `(end_ts ā bucket_size)ā¦end_ts`
+ or `start_tsā¦end_ts`. (Imagine a histogram for these values)
+
+Stats are maintained in two tables (for each type): current and historical.
+
+Current stats correspond to the present values. Each subject can only have one
+entry.
+
+Historical stats correspond to values in the past. Subjects may have multiple
+entries.
+
+## Concepts around the management of stats
+
+### Current rows
+
+Current rows contain the most up-to-date statistics for a room.
+They only contain absolute columns
+
+### Historical rows
+
+Historical rows can always be considered to be valid for the time slice and
+end time specified.
+
+* historical rows will not exist for every time slice ā they will be omitted
+ if there were no changes. In this case, the following assumptions can be
+ made to interpolate/recreate missing rows:
+ - absolute fields have the same values as in the preceding row
+ - per-slice fields are zero (`0`)
+* historical rows will not be retained forever ā rows older than a configurable
+ time will be purged.
+
+#### Purge
+
+The purging of historical rows is not yet implemented.
diff --git a/jenkins/prepare_synapse.sh b/jenkins/prepare_synapse.sh
deleted file mode 100755
index 016afb8baa..0000000000
--- a/jenkins/prepare_synapse.sh
+++ /dev/null
@@ -1,16 +0,0 @@
-#! /bin/bash
-
-set -eux
-
-cd "`dirname $0`/.."
-
-TOX_DIR=$WORKSPACE/.tox
-
-mkdir -p $TOX_DIR
-
-if ! [ $TOX_DIR -ef .tox ]; then
- ln -s "$TOX_DIR" .tox
-fi
-
-# set up the virtualenv
-tox -e py27 --notest -v
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 7b3a5a8221..ddc195bc32 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -276,25 +276,25 @@ class Auth(object):
self.get_access_token_from_request(request)
)
if app_service is None:
- return (None, None)
+ return None, None
if app_service.ip_range_whitelist:
ip_address = IPAddress(self.hs.get_ip_from_request(request))
if ip_address not in app_service.ip_range_whitelist:
- return (None, None)
+ return None, None
if b"user_id" not in request.args:
- return (app_service.sender, app_service)
+ return app_service.sender, app_service
user_id = request.args[b"user_id"][0].decode("utf8")
if app_service.sender == user_id:
- return (app_service.sender, app_service)
+ return app_service.sender, app_service
if not app_service.is_interested_in_user(user_id):
raise AuthError(403, "Application service cannot masquerade as this user.")
if not (yield self.store.get_user_by_id(user_id)):
raise AuthError(403, "Application service has not registered this user")
- return (user_id, app_service)
+ return user_id, app_service
@defer.inlineCallbacks
def get_user_by_access_token(self, token, rights="access"):
@@ -694,7 +694,7 @@ class Auth(object):
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.check_user_was_in_room(room_id, user_id)
- return (member_event.membership, member_event.event_id)
+ return member_event.membership, member_event.event_id
except AuthError:
visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
@@ -703,8 +703,7 @@ class Auth(object):
visibility
and visibility.content["history_visibility"] == "world_readable"
):
- return (Membership.JOIN, None)
- return
+ return Membership.JOIN, None
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 611d285421..9504bfbc70 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -70,12 +70,12 @@ class PresenceStatusStubServlet(RestServlet):
except HttpResponseException as e:
raise e.to_synapse_error()
- return (200, result)
+ return 200, result
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
- return (200, {})
+ return 200, {}
class KeyUploadServlet(RestServlet):
@@ -126,11 +126,11 @@ class KeyUploadServlet(RestServlet):
self.main_uri + request.uri.decode("ascii"), body, headers=headers
)
- return (200, result)
+ return 200, result
else:
# Just interested in counts.
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
- return (200, {"one_time_key_counts": result})
+ return 200, {"one_time_key_counts": result}
class FrontendProxySlavedStore(
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 007ca75a94..3e25bf5747 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -107,7 +107,6 @@ class ApplicationServiceApi(SimpleHttpClient):
except CodeMessageException as e:
if e.code == 404:
return False
- return
logger.warning("query_user to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("query_user to %s threw exception %s", uri, ex)
@@ -127,7 +126,6 @@ class ApplicationServiceApi(SimpleHttpClient):
logger.warning("query_alias to %s received %s", uri, e.code)
if e.code == 404:
return False
- return
except Exception as ex:
logger.warning("query_alias to %s threw exception %s", uri, ex)
return False
@@ -230,7 +228,6 @@ class ApplicationServiceApi(SimpleHttpClient):
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
return True
- return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
index b518a3ed9c..b18ddbd1fa 100644
--- a/synapse/config/stats.py
+++ b/synapse/config/stats.py
@@ -27,19 +27,16 @@ class StatsConfig(Config):
def read_config(self, config, **kwargs):
self.stats_enabled = True
- self.stats_bucket_size = 86400
+ self.stats_bucket_size = 86400 * 1000
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
- self.stats_bucket_size = (
- self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
+ self.stats_bucket_size = self.parse_duration(
+ stats_config.get("bucket_size", "1d")
)
- self.stats_retention = (
- self.parse_duration(
- stats_config.get("retention", "%ds" % (sys.maxsize,))
- )
- / 1000
+ self.stats_retention = self.parse_duration(
+ stats_config.get("retention", "%ds" % (sys.maxsize,))
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index 41eabbe717..694fb2c816 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -83,7 +83,7 @@ def compute_content_hash(event_dict, hash_algorithm):
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
- return (hashed.name, hashed.digest())
+ return hashed.name, hashed.digest()
def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
@@ -106,7 +106,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
event_dict.pop("unsigned", None)
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
- return (hashed.name, hashed.digest())
+ return hashed.name, hashed.digest()
def compute_event_signature(event_dict, signature_name, signing_key):
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index cd52e3f867..4e91df60e6 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -637,11 +637,11 @@ def auth_types_for_event(event):
if event.type == EventTypes.Create:
return []
- auth_types = []
-
- auth_types.append((EventTypes.PowerLevels, ""))
- auth_types.append((EventTypes.Member, event.sender))
- auth_types.append((EventTypes.Create, ""))
+ auth_types = [
+ (EventTypes.PowerLevels, ""),
+ (EventTypes.Member, event.sender),
+ (EventTypes.Create, ""),
+ ]
if event.type == EventTypes.Member:
membership = event.content["membership"]
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index bec3080895..6ee6216660 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -355,7 +355,7 @@ class FederationClient(FederationBase):
auth_chain.sort(key=lambda e: e.depth)
- return (pdus, auth_chain)
+ return pdus, auth_chain
except HttpResponseException as e:
if e.code == 400 or e.code == 404:
logger.info("Failed to use get_room_state_ids API, falling back")
@@ -404,7 +404,7 @@ class FederationClient(FederationBase):
signed_auth.sort(key=lambda e: e.depth)
- return (signed_pdus, signed_auth)
+ return signed_pdus, signed_auth
@defer.inlineCallbacks
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
@@ -429,7 +429,7 @@ class FederationClient(FederationBase):
missing_events.discard(k)
if not missing_events:
- return (signed_events, failed_to_fetch)
+ return signed_events, failed_to_fetch
logger.debug(
"Fetching unknown state/auth events %s for room %s",
@@ -465,7 +465,7 @@ class FederationClient(FederationBase):
# We removed all events we successfully fetched from `batch`
failed_to_fetch.update(batch)
- return (signed_events, failed_to_fetch)
+ return signed_events, failed_to_fetch
@defer.inlineCallbacks
@log_function
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 05fd49f3c1..e5f0b90aec 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -100,7 +100,7 @@ class FederationServer(FederationBase):
res = self._transaction_from_pdus(pdus).get_dict()
- return (200, res)
+ return 200, res
@defer.inlineCallbacks
@log_function
@@ -163,7 +163,7 @@ class FederationServer(FederationBase):
yield self.transaction_actions.set_response(
origin, transaction, 400, response
)
- return (400, response)
+ return 400, response
received_pdus_counter.inc(len(transaction.pdus))
@@ -265,7 +265,7 @@ class FederationServer(FederationBase):
logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response(origin, transaction, 200, response)
- return (200, response)
+ return 200, response
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
@@ -298,7 +298,7 @@ class FederationServer(FederationBase):
event_id,
)
- return (200, resp)
+ return 200, resp
@defer.inlineCallbacks
def on_state_ids_request(self, origin, room_id, event_id):
@@ -315,7 +315,7 @@ class FederationServer(FederationBase):
state_ids = yield self.handler.get_state_ids_for_pdu(room_id, event_id)
auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids)
- return (200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids})
+ return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
@defer.inlineCallbacks
def _on_context_state_request_compute(self, room_id, event_id):
@@ -345,15 +345,15 @@ class FederationServer(FederationBase):
pdu = yield self.handler.get_persisted_pdu(origin, event_id)
if pdu:
- return (200, self._transaction_from_pdus([pdu]).get_dict())
+ return 200, self._transaction_from_pdus([pdu]).get_dict()
else:
- return (404, "")
+ return 404, ""
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
- return (200, resp)
+ return 200, resp
@defer.inlineCallbacks
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
@@ -435,7 +435,7 @@ class FederationServer(FederationBase):
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
yield self.handler.on_send_leave_request(origin, pdu)
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id):
@@ -446,7 +446,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
- return (200, res)
+ return 200, res
@defer.inlineCallbacks
def on_query_auth_request(self, origin, content, room_id, event_id):
@@ -499,7 +499,7 @@ class FederationServer(FederationBase):
"missing": ret.get("missing", []),
}
- return (200, send_content)
+ return 200, send_content
@log_function
def on_query_client_keys(self, origin, content):
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 62ca6a3e87..5b6c79c51a 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -26,6 +26,7 @@ from synapse.logging.opentracing import (
set_tag,
start_active_span_follows_from,
tags,
+ whitelisted_homeserver,
)
from synapse.util.metrics import measure_func
@@ -59,9 +60,15 @@ class TransactionManager(object):
# The span_contexts is a generator so that it won't be evaluated if
# opentracing is disabled. (Yay speed!)
- span_contexts = (
- extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
- )
+ span_contexts = []
+ keep_destination = whitelisted_homeserver(destination)
+
+ for edu in pending_edus:
+ context = edu.get_context()
+ if context:
+ span_contexts.append(extract_text_map(json.loads(context)))
+ if keep_destination:
+ edu.strip_context()
with start_active_span_follows_from("send_transaction", span_contexts):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index f9930b6460..132a8fb5e6 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -342,7 +342,11 @@ class BaseFederationServlet(object):
continue
server.register_paths(
- method, (pattern,), self._wrap(code), self.__class__.__name__
+ method,
+ (pattern,),
+ self._wrap(code),
+ self.__class__.__name__,
+ trace=False,
)
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index aa84621206..b4d743cde7 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -41,6 +41,9 @@ class Edu(JsonEncodedObject):
def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
+ def strip_context(self):
+ getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"
+
class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 8acd9f9a83..38bc67191c 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -51,8 +51,8 @@ class AccountDataEventSource(object):
{"type": account_data_type, "content": content, "room_id": room_id}
)
- return (results, current_stream_id)
+ return results, current_stream_id
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
- return ([], config.to_id)
+ return [], config.to_id
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index d1a51df6f9..3e9b298154 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -294,12 +294,10 @@ class ApplicationServicesHandler(object):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
return False
- return
user_info = yield self.store.get_user_by_id(user_id)
if user_info:
return False
- return
# user not found; could be the AS though, so check.
services = self.store.get_app_services()
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 0f3ebf7ef8..f844409d21 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -280,7 +280,7 @@ class AuthHandler(BaseHandler):
creds,
list(clientdict),
)
- return (creds, clientdict, session["id"])
+ return creds, clientdict, session["id"]
ret = self._auth_dict_for_flows(flows, session)
ret["completed"] = list(creds)
@@ -722,7 +722,7 @@ class AuthHandler(BaseHandler):
known_login_type = True
is_valid = yield provider.check_password(qualified_user_id, password)
if is_valid:
- return (qualified_user_id, None)
+ return qualified_user_id, None
if not hasattr(provider, "get_supported_login_types") or not hasattr(
provider, "check_auth"
@@ -766,7 +766,7 @@ class AuthHandler(BaseHandler):
)
if canonical_user_id:
- return (canonical_user_id, None)
+ return canonical_user_id, None
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
@@ -816,7 +816,7 @@ class AuthHandler(BaseHandler):
result = (result, None)
return result
- return (None, None)
+ return None, None
@defer.inlineCallbacks
def _check_local_password(self, user_id, password):
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5c1cf83c9d..71a8f33da3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -25,6 +25,7 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
+from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
+ @trace
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
defer.Deferred: list[dict[str, X]]: info on each device
"""
+ set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
for device in devices:
_update_device_from_client_ips(device, ips)
+ log_kv(device_map)
return devices
+ @trace
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
@@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
+
+ set_tag("device", device)
+ set_tag("ips", ips)
+
return device
@measure_func("device.get_user_ids_changed")
+ @trace
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
@@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
user_id (str)
from_token (StreamToken)
"""
+
+ set_tag("user_id", user_id)
+ set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
+ log_kv(
+ {"event": "encountered empty previous state", "room_id": room_id}
+ )
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
@@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = []
possibly_left = []
- return {"changed": list(possibly_joined), "left": list(possibly_left)}
+ result = {"changed": list(possibly_joined), "left": list(possibly_left)}
+
+ log_kv(result)
+
+ return result
class DeviceHandler(DeviceWorkerHandler):
@@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
+ @trace
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
+ set_tag("error", True)
+ log_kv(
+ {"reason": "User doesn't have device id.", "device_id": device_id}
+ )
pass
else:
raise
@@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
yield self.notify_device_update(user_id, [device_id])
+ @trace
@defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices
@@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
+ set_tag("error", True)
+ set_tag("reason", "User doesn't have that device id.")
pass
else:
raise
@@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
+ @trace
@measure_func("notify_device_update")
@defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids):
@@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
+ set_tag("target_hosts", hosts)
+
position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
)
@@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler):
)
for host in hosts:
self.federation_sender.send_device_messages(host)
+ log_kv({"message": "sent device update to host", "host": host})
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
@@ -451,12 +483,15 @@ class DeviceListUpdater(object):
iterable=True,
)
+ @trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""
+ set_tag("origin", origin)
+ set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@@ -471,12 +506,30 @@ class DeviceListUpdater(object):
device_id,
origin,
)
+
+ set_tag("error", True)
+ log_kv(
+ {
+ "message": "Got a device list update edu from a user and "
+ "device which does not match the origin of the request.",
+ "user_id": user_id,
+ "device_id": device_id,
+ }
+ )
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
+ set_tag("error", True)
+ log_kv(
+ {
+ "message": "Got an update from a user for which "
+ "we don't share any rooms",
+ "other user_id": user_id,
+ }
+ )
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id,
@@ -578,6 +631,7 @@ class DeviceListUpdater(object):
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
+ log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
@@ -594,13 +648,20 @@ class DeviceListUpdater(object):
# eventually become consistent.
return
except FederationDeniedError as e:
+ set_tag("error", True)
+ log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
- except Exception:
+ except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
+ set_tag("error", True)
+ log_kv(
+ {"message": "Exception raised by federation request", "exception": e}
+ )
logger.exception("Failed to handle device list update for %s", user_id)
return
+ log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index c7d56779b8..0043cbea17 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -22,9 +22,9 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
+ log_kv,
set_tag,
start_active_span,
- whitelisted_homeserver,
)
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
@@ -86,7 +86,8 @@ class DeviceMessageHandler(object):
@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):
-
+ set_tag("number_of_messages", len(messages))
+ set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
@@ -119,11 +120,10 @@ class DeviceMessageHandler(object):
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
- "org.matrix.opentracing_context": json.dumps(context)
- if whitelisted_homeserver(destination)
- else None,
+ "org.matrix.opentracing_context": json.dumps(context),
}
+ log_kv({"local_messages": local_messages})
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
@@ -132,6 +132,7 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys()
)
+ log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 2f1f10a9af..5e748687e3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -167,7 +167,6 @@ class EventHandler(BaseHandler):
if not event:
return None
- return
users = yield self.store.get_users_in_room(event.room_id)
is_peeking = user.to_string() not in users
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 94306c94a9..538b16efd6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1428,7 +1428,7 @@ class FederationHandler(BaseHandler):
assert event.user_id == user_id
assert event.state_key == user_id
assert event.room_id == room_id
- return (origin, event, format_ver)
+ return origin, event, format_ver
@defer.inlineCallbacks
@log_function
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 97daca5fee..583b612dd9 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -61,21 +61,76 @@ class IdentityHandler(BaseHandler):
return False
return True
+ def _extract_items_from_creds_dict(self, creds):
+ """
+ Retrieve entries from a "credentials" dictionary
+
+ Args:
+ creds (dict[str, str]): Dictionary of credentials that contain the following keys:
+ * client_secret|clientSecret: A unique secret str provided by the client
+ * id_server|idServer: the domain of the identity server to query
+ * id_access_token: The access token to authenticate to the identity
+ server with.
+
+ Returns:
+ tuple(str, str, str|None): A tuple containing the client_secret, the id_server,
+ and the id_access_token value if available.
+ """
+ client_secret = creds.get("client_secret") or creds.get("clientSecret")
+ if not client_secret:
+ raise SynapseError(
+ 400, "No client_secret in creds", errcode=Codes.MISSING_PARAM
+ )
+
+ id_server = creds.get("id_server") or creds.get("idServer")
+ if not id_server:
+ raise SynapseError(
+ 400, "No id_server in creds", errcode=Codes.MISSING_PARAM
+ )
+
+ id_access_token = creds.get("id_access_token")
+ return client_secret, id_server, id_access_token
+
@defer.inlineCallbacks
- def threepid_from_creds(self, creds):
- if "id_server" in creds:
- id_server = creds["id_server"]
- elif "idServer" in creds:
- id_server = creds["idServer"]
- else:
- raise SynapseError(400, "No id_server in creds")
+ def threepid_from_creds(self, creds, use_v2=True):
+ """
+ Retrieve and validate a threepid identitier from a "credentials" dictionary
+
+ Args:
+ creds (dict[str, str]): Dictionary of credentials that contain the following keys:
+ * client_secret|clientSecret: A unique secret str provided by the client
+ * id_server|idServer: the domain of the identity server to query
+ * id_access_token: The access token to authenticate to the identity
+ server with. Required if use_v2 is true
+ use_v2 (bool): Whether to use v2 Identity Service API endpoints
- if "client_secret" in creds:
- client_secret = creds["client_secret"]
- elif "clientSecret" in creds:
- client_secret = creds["clientSecret"]
+ Returns:
+ Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
+ the /getValidated3pid endpoint of the Identity Service API, or None if the
+ threepid was not found
+ """
+ client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
+ creds
+ )
+
+ # If an id_access_token is not supplied, force usage of v1
+ if id_access_token is None:
+ use_v2 = False
+
+ query_params = {"sid": creds["sid"], "client_secret": client_secret}
+
+ # Decide which API endpoint URLs and query parameters to use
+ if use_v2:
+ url = "https://%s%s" % (
+ id_server,
+ "/_matrix/identity/v2/3pid/getValidated3pid",
+ )
+ query_params["id_access_token"] = id_access_token
else:
- raise SynapseError(400, "No client_secret in creds")
+ url = "https://%s%s" % (
+ id_server,
+ "/_matrix/identity/api/v1/3pid/getValidated3pid",
+ )
if not self._should_trust_id_server(id_server):
logger.warn(
@@ -85,43 +140,55 @@ class IdentityHandler(BaseHandler):
return None
try:
- data = yield self.http_client.get_json(
- "https://%s%s"
- % (id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid"),
- {"sid": creds["sid"], "client_secret": client_secret},
- )
+ data = yield self.http_client.get_json(url, query_params)
+ return data if "medium" in data else None
except HttpResponseException as e:
- logger.info("getValidated3pid failed with Matrix error: %r", e)
- raise e.to_synapse_error()
+ if e.code != 404 or not use_v2:
+ # Generic failure
+ logger.info("getValidated3pid failed with Matrix error: %r", e)
+ raise e.to_synapse_error()
- if "medium" in data:
- return data
- return None
+ # This identity server is too old to understand Identity Service API v2
+ # Attempt v1 endpoint
+ logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", url)
+ return (yield self.threepid_from_creds(creds, use_v2=False))
@defer.inlineCallbacks
- def bind_threepid(self, creds, mxid):
+ def bind_threepid(self, creds, mxid, use_v2=True):
+ """Bind a 3PID to an identity server
+
+ Args:
+ creds (dict[str, str]): Dictionary of credentials that contain the following keys:
+ * client_secret|clientSecret: A unique secret str provided by the client
+ * id_server|idServer: the domain of the identity server to query
+ * id_access_token: The access token to authenticate to the identity
+ server with. Required if use_v2 is true
+ mxid (str): The MXID to bind the 3PID to
+ use_v2 (bool): Whether to use v2 Identity Service API endpoints
+
+ Returns:
+ Deferred[dict]: The response from the identity server
+ """
logger.debug("binding threepid %r to %s", creds, mxid)
- data = None
- if "id_server" in creds:
- id_server = creds["id_server"]
- elif "idServer" in creds:
- id_server = creds["idServer"]
- else:
- raise SynapseError(400, "No id_server in creds")
+ client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
+ creds
+ )
+
+ # If an id_access_token is not supplied, force usage of v1
+ if id_access_token is None:
+ use_v2 = False
- if "client_secret" in creds:
- client_secret = creds["client_secret"]
- elif "clientSecret" in creds:
- client_secret = creds["clientSecret"]
+ # Decide which API endpoint URLs to use
+ bind_data = {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid}
+ if use_v2:
+ bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
+ bind_data["id_access_token"] = id_access_token
else:
- raise SynapseError(400, "No client_secret in creds")
+ bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
try:
- data = yield self.http_client.post_json_get_json(
- "https://%s%s" % (id_server, "/_matrix/identity/api/v1/3pid/bind"),
- {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid},
- )
+ data = yield self.http_client.post_json_get_json(bind_url, bind_data)
logger.debug("bound threepid %r to %s", creds, mxid)
# Remember where we bound the threepid
@@ -131,13 +198,23 @@ class IdentityHandler(BaseHandler):
address=data["address"],
id_server=id_server,
)
+
+ return data
+ except HttpResponseException as e:
+ if e.code != 404 or not use_v2:
+ logger.error("3PID bind failed with Matrix error: %r", e)
+ raise e.to_synapse_error()
except CodeMessageException as e:
data = json.loads(e.msg) # XXX WAT?
- return data
+ return data
+
+ logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
+ return (yield self.bind_threepid(creds, mxid, use_v2=False))
@defer.inlineCallbacks
def try_unbind_threepid(self, mxid, threepid):
- """Removes a binding from an identity server
+ """Attempt to remove a 3PID from an identity server, or if one is not provided, all
+ identity servers we're aware the binding is present on
Args:
mxid (str): Matrix user ID of binding to be removed
@@ -188,6 +265,8 @@ class IdentityHandler(BaseHandler):
server doesn't support unbinding
"""
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+ url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
+
content = {
"mxid": mxid,
"threepid": {"medium": threepid["medium"], "address": threepid["address"]},
@@ -199,7 +278,7 @@ class IdentityHandler(BaseHandler):
auth_headers = self.federation_http_client.build_auth_headers(
destination=None,
method="POST",
- url_bytes="/_matrix/identity/api/v1/3pid/unbind".encode("ascii"),
+ url_bytes=url_bytes,
content=content,
destination_is=id_server,
)
@@ -282,16 +361,3 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
-
-
-class LookupAlgorithm:
- """
- Supported hashing algorithms when performing a 3PID lookup.
-
- SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
- encoding
- NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
- """
-
- SHA256 = "sha256"
- NONE = "none"
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 42d6650ed9..f991efeee3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -449,8 +449,7 @@ class InitialSyncHandler(BaseHandler):
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- return (member_event.membership, member_event.event_id)
- return
+ return member_event.membership, member_event.event_id
except AuthError:
visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
@@ -459,8 +458,7 @@ class InitialSyncHandler(BaseHandler):
visibility
and visibility.content["history_visibility"] == "world_readable"
):
- return (Membership.JOIN, None)
- return
+ return Membership.JOIN, None
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 94a9ca0357..053cf66b28 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -255,7 +255,7 @@ class PresenceHandler(object):
self.unpersisted_users_changes = set()
if unpersisted:
- logger.info("Persisting %d upersisted presence updates", len(unpersisted))
+ logger.info("Persisting %d unpersisted presence updates", len(unpersisted))
yield self.store.update_presence(
[self.user_to_current_state[user_id] for user_id in unpersisted]
)
@@ -1032,7 +1032,7 @@ class PresenceEventSource(object):
#
# Hence this guard where we just return nothing so that the sync
# doesn't return. C.f. #5503.
- return ([], max_token)
+ return [], max_token
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
@@ -1279,7 +1279,7 @@ def get_interested_parties(store, states):
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
- return (room_ids_to_states, users_to_states)
+ return room_ids_to_states, users_to_states
@defer.inlineCallbacks
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 73973502a4..6854c751a6 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -148,7 +148,7 @@ class ReceiptEventSource(object):
to_key = yield self.get_current_key()
if from_key == to_key:
- return ([], to_key)
+ return [], to_key
events = yield self.store.get_linearized_receipts_for_rooms(
room_ids, from_key=from_key, to_key=to_key
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 4631fab94e..975da57ffd 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,13 +24,11 @@ from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
- InvalidCaptchaError,
LimitExceededError,
RegistrationError,
SynapseError,
)
from synapse.config.server import is_threepid_reserved
-from synapse.http.client import CaptchaServerHttpClient
from synapse.http.servlet import assert_params_in_dict
from synapse.replication.http.login import RegisterDeviceReplicationServlet
from synapse.replication.http.register import (
@@ -39,7 +37,6 @@ from synapse.replication.http.register import (
)
from synapse.types import RoomAlias, RoomID, UserID, create_requester
from synapse.util.async_helpers import Linearizer
-from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -59,7 +56,6 @@ class RegistrationHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
- self.captcha_client = CaptchaServerHttpClient(hs)
self.identity_handler = self.hs.get_handlers().identity_handler
self.ratelimiter = hs.get_registration_ratelimiter()
@@ -362,70 +358,6 @@ class RegistrationHandler(BaseHandler):
)
return user_id
- @defer.inlineCallbacks
- def check_recaptcha(self, ip, private_key, challenge, response):
- """
- Checks a recaptcha is correct.
-
- Used only by c/s api v1
- """
-
- captcha_response = yield self._validate_captcha(
- ip, private_key, challenge, response
- )
- if not captcha_response["valid"]:
- logger.info(
- "Invalid captcha entered from %s. Error: %s",
- ip,
- captcha_response["error_url"],
- )
- raise InvalidCaptchaError(error_url=captcha_response["error_url"])
- else:
- logger.info("Valid captcha entered from %s", ip)
-
- @defer.inlineCallbacks
- def register_email(self, threepidCreds):
- """
- Registers emails with an identity server.
-
- Used only by c/s api v1
- """
-
- for c in threepidCreds:
- logger.info(
- "validating threepidcred sid %s on id server %s",
- c["sid"],
- c["idServer"],
- )
- try:
- threepid = yield self.identity_handler.threepid_from_creds(c)
- except Exception:
- logger.exception("Couldn't validate 3pid")
- raise RegistrationError(400, "Couldn't validate 3pid")
-
- if not threepid:
- raise RegistrationError(400, "Couldn't validate 3pid")
- logger.info(
- "got threepid with medium '%s' and address '%s'",
- threepid["medium"],
- threepid["address"],
- )
-
- if not check_3pid_allowed(self.hs, threepid["medium"], threepid["address"]):
- raise RegistrationError(403, "Third party identifier is not allowed")
-
- @defer.inlineCallbacks
- def bind_emails(self, user_id, threepidCreds):
- """Links emails with a user ID and informs an identity server.
-
- Used only by c/s api v1
- """
-
- # Now we have a matrix ID, bind it to the threepids we were given
- for c in threepidCreds:
- # XXX: This should be a deferred list, shouldn't it?
- yield self.identity_handler.bind_threepid(c, user_id)
-
def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
# don't allow people to register the server notices mxid
if self._server_notices_mxid is not None:
@@ -464,44 +396,7 @@ class RegistrationHandler(BaseHandler):
return str(id)
@defer.inlineCallbacks
- def _validate_captcha(self, ip_addr, private_key, challenge, response):
- """Validates the captcha provided.
-
- Used only by c/s api v1
-
- Returns:
- dict: Containing 'valid'(bool) and 'error_url'(str) if invalid.
-
- """
- response = yield self._submit_captcha(ip_addr, private_key, challenge, response)
- # parse Google's response. Lovely format..
- lines = response.split("\n")
- json = {
- "valid": lines[0] == "true",
- "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?"
- + "error=%s" % lines[1],
- }
- return json
-
- @defer.inlineCallbacks
- def _submit_captcha(self, ip_addr, private_key, challenge, response):
- """
- Used only by c/s api v1
- """
- data = yield self.captcha_client.post_urlencoded_get_raw(
- "http://www.recaptcha.net:80/recaptcha/api/verify",
- args={
- "privatekey": private_key,
- "remoteip": ip_addr,
- "challenge": challenge,
- "response": response,
- },
- )
- return data
-
- @defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
- room_id = None
room_member_handler = self.hs.get_room_member_handler()
if RoomID.is_valid(room_identifier):
room_id = room_identifier
@@ -622,7 +517,7 @@ class RegistrationHandler(BaseHandler):
initial_display_name=initial_display_name,
is_guest=is_guest,
)
- return (r["device_id"], r["access_token"])
+ return r["device_id"], r["access_token"]
valid_until_ms = None
if self.session_lifetime is not None:
@@ -648,9 +543,7 @@ class RegistrationHandler(BaseHandler):
return (device_id, access_token)
@defer.inlineCallbacks
- def post_registration_actions(
- self, user_id, auth_result, access_token, bind_email, bind_msisdn
- ):
+ def post_registration_actions(self, user_id, auth_result, access_token):
"""A user has completed registration
Args:
@@ -659,18 +552,10 @@ class RegistrationHandler(BaseHandler):
registered user.
access_token (str|None): The access token of the newly logged in
device, or None if `inhibit_login` enabled.
- bind_email (bool): Whether to bind the email with the identity
- server.
- bind_msisdn (bool): Whether to bind the msisdn with the identity
- server.
"""
if self.hs.config.worker_app:
yield self._post_registration_client(
- user_id=user_id,
- auth_result=auth_result,
- access_token=access_token,
- bind_email=bind_email,
- bind_msisdn=bind_msisdn,
+ user_id=user_id, auth_result=auth_result, access_token=access_token
)
return
@@ -683,13 +568,11 @@ class RegistrationHandler(BaseHandler):
):
yield self.store.upsert_monthly_active_user(user_id)
- yield self._register_email_threepid(
- user_id, threepid, access_token, bind_email
- )
+ yield self._register_email_threepid(user_id, threepid, access_token)
if auth_result and LoginType.MSISDN in auth_result:
threepid = auth_result[LoginType.MSISDN]
- yield self._register_msisdn_threepid(user_id, threepid, bind_msisdn)
+ yield self._register_msisdn_threepid(user_id, threepid)
if auth_result and LoginType.TERMS in auth_result:
yield self._on_user_consented(user_id, self.hs.config.user_consent_version)
@@ -708,14 +591,12 @@ class RegistrationHandler(BaseHandler):
yield self.post_consent_actions(user_id)
@defer.inlineCallbacks
- def _register_email_threepid(self, user_id, threepid, token, bind_email):
+ def _register_email_threepid(self, user_id, threepid, token):
"""Add an email address as a 3pid identifier
Also adds an email pusher for the email address, if configured in the
HS config
- Also optionally binds emails to the given user_id on the identity server
-
Must be called on master.
Args:
@@ -723,8 +604,6 @@ class RegistrationHandler(BaseHandler):
threepid (object): m.login.email.identity auth response
token (str|None): access_token for the user, or None if not logged
in.
- bind_email (bool): true if the client requested the email to be
- bound at the identity server
Returns:
defer.Deferred:
"""
@@ -766,29 +645,15 @@ class RegistrationHandler(BaseHandler):
data={},
)
- if bind_email:
- logger.info("bind_email specified: binding")
- logger.debug("Binding emails %s to %s" % (threepid, user_id))
- yield self.identity_handler.bind_threepid(
- threepid["threepid_creds"], user_id
- )
- else:
- logger.info("bind_email not specified: not binding email")
-
@defer.inlineCallbacks
- def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn):
+ def _register_msisdn_threepid(self, user_id, threepid):
"""Add a phone number as a 3pid identifier
- Also optionally binds msisdn to the given user_id on the identity server
-
Must be called on master.
Args:
user_id (str): id of user
threepid (object): m.login.msisdn auth response
- token (str): access_token for the user
- bind_email (bool): true if the client requested the email to be
- bound at the identity server
Returns:
defer.Deferred:
"""
@@ -804,12 +669,3 @@ class RegistrationHandler(BaseHandler):
yield self._auth_handler.add_threepid(
user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
)
-
- if bind_msisdn:
- logger.info("bind_msisdn specified: binding")
- logger.debug("Binding msisdn %s to %s", threepid, user_id)
- yield self.identity_handler.bind_threepid(
- threepid["threepid_creds"], user_id
- )
- else:
- logger.info("bind_msisdn not specified: not binding msisdn")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6e47fe7867..a509e11d69 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -852,7 +852,6 @@ class RoomContextHandler(object):
)
if not event:
return None
- return
filtered = yield (filter_evts([event]))
if not filtered:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 4605cb9c0b..093f2ea36e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -29,11 +29,9 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
-from synapse.handlers.identity import LookupAlgorithm
from synapse.types import RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
-from synapse.util.hash import sha256_and_url_safe_base64
from ._base import BaseHandler
@@ -525,7 +523,7 @@ class RoomMemberHandler(object):
event (SynapseEvent): The membership event.
context: The context of the event.
is_guest (bool): Whether the sender is a guest.
- remote_room_hosts (list[str]|None): Homeservers which are likely to already be in
+ room_hosts ([str]): Homeservers which are likely to already be in
the room, and could be danced with in order to join this
homeserver for the first time.
ratelimit (bool): Whether to rate limit this request.
@@ -636,7 +634,7 @@ class RoomMemberHandler(object):
servers.remove(room_alias.domain)
servers.insert(0, room_alias.domain)
- return RoomID.from_string(room_id), servers
+ return (RoomID.from_string(room_id), servers)
@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
@@ -699,44 +697,6 @@ class RoomMemberHandler(object):
raise SynapseError(
403, "Looking up third-party identifiers is denied from this server"
)
-
- # Check what hashing details are supported by this identity server
- use_v1 = False
- hash_details = None
- try:
- hash_details = yield self.simple_http_client.get_json(
- "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server)
- )
- except (HttpResponseException, ValueError) as e:
- # Catch HttpResponseExcept for a non-200 response code
- # Catch ValueError for non-JSON response body
-
- # Check if this identity server does not know about v2 lookups
- if e.code == 404:
- # This is an old identity server that does not yet support v2 lookups
- use_v1 = True
- else:
- logger.warn("Error when looking up hashing details: %s" % (e,))
- return None
-
- if use_v1:
- return (yield self._lookup_3pid_v1(id_server, medium, address))
-
- return (yield self._lookup_3pid_v2(id_server, medium, address, hash_details))
-
- @defer.inlineCallbacks
- def _lookup_3pid_v1(self, id_server, medium, address):
- """Looks up a 3pid in the passed identity server using v1 lookup.
-
- Args:
- id_server (str): The server name (including port, if required)
- of the identity server to use.
- medium (str): The type of the third party identifier (e.g. "email").
- address (str): The third party identifier (e.g. "foo@example.com").
-
- Returns:
- str: the matrix ID of the 3pid, or None if it is not recognized.
- """
try:
data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
@@ -751,83 +711,8 @@ class RoomMemberHandler(object):
except IOError as e:
logger.warn("Error from identity server lookup: %s" % (e,))
-
- return None
-
- @defer.inlineCallbacks
- def _lookup_3pid_v2(self, id_server, medium, address, hash_details):
- """Looks up a 3pid in the passed identity server using v2 lookup.
-
- Args:
- id_server (str): The server name (including port, if required)
- of the identity server to use.
- medium (str): The type of the third party identifier (e.g. "email").
- address (str): The third party identifier (e.g. "foo@example.com").
- hash_details (dict[str, str|list]): A dictionary containing hashing information
- provided by an identity server.
-
- Returns:
- Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
- """
- # Extract information from hash_details
- supported_lookup_algorithms = hash_details["algorithms"]
- lookup_pepper = hash_details["lookup_pepper"]
-
- # Check if any of the supported lookup algorithms are present
- if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
- # Perform a hashed lookup
- lookup_algorithm = LookupAlgorithm.SHA256
-
- # Hash address, medium and the pepper with sha256
- to_hash = "%s %s %s" % (address, medium, lookup_pepper)
- lookup_value = sha256_and_url_safe_base64(to_hash)
-
- elif LookupAlgorithm.NONE in supported_lookup_algorithms:
- # Perform a non-hashed lookup
- lookup_algorithm = LookupAlgorithm.NONE
-
- # Combine together plaintext address and medium
- lookup_value = "%s %s" % (address, medium)
-
- else:
- logger.warn(
- "None of the provided lookup algorithms of %s%s are supported: %s",
- id_server_scheme,
- id_server,
- hash_details["algorithms"],
- )
- raise SynapseError(
- 400,
- "Provided identity server does not support any v2 lookup "
- "algorithms that this homeserver supports.",
- )
-
- try:
- lookup_results = yield self.simple_http_client.post_json_get_json(
- "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
- {
- "addresses": [lookup_value],
- "algorithm": lookup_algorithm,
- "pepper": lookup_pepper,
- },
- )
- except (HttpResponseException, ValueError) as e:
- # Catch HttpResponseExcept for a non-200 response code
- # Catch ValueError for non-JSON response body
- logger.warn("Error when performing a 3pid lookup: %s" % (e,))
return None
- # Check for a mapping from what we looked up to an MXID
- if "mappings" not in lookup_results or not isinstance(
- lookup_results["mappings"], dict
- ):
- logger.debug("No results from 3pid lookup")
- return None
-
- # Return the MXID if it's available, or None otherwise
- mxid = lookup_results["mappings"].get(lookup_value)
- return mxid
-
@defer.inlineCallbacks
def _verify_any_signature(self, data, server_hostname):
if server_hostname not in data["signatures"]:
@@ -1018,7 +903,7 @@ class RoomMemberHandler(object):
if not public_keys:
public_keys.append(fallback_public_key)
display_name = data["display_name"]
- return (token, public_keys, fallback_public_key, display_name)
+ return token, public_keys, fallback_public_key, display_name
@defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids):
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..921735edb3 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -14,15 +14,14 @@
# limitations under the License.
import logging
+from collections import Counter
from twisted.internet import defer
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID
-from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler):
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
- if not self.hs.config.stats_enabled:
+ if not self.hs.config.stats_enabled or self._is_processing:
return
- if self._is_processing:
- return
+ self._is_processing = True
@defer.inlineCallbacks
def process():
@@ -75,39 +73,72 @@ class StatsHandler(StateDeltasHandler):
finally:
self._is_processing = False
- self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
- self.pos = yield self.store.get_stats_stream_pos()
-
- # If still None then the initial background update hasn't happened yet
- if self.pos is None:
- return None
+ self.pos = yield self.store.get_stats_positions()
# Loop round handling deltas until we're up to date
+
while True:
- with Measure(self.clock, "stats_delta"):
- deltas = yield self.store.get_current_state_deltas(self.pos)
- if not deltas:
- return
+ deltas = yield self.store.get_current_state_deltas(self.pos)
+
+ if deltas:
+ logger.debug("Handling %d state deltas", len(deltas))
+ room_deltas, user_deltas = yield self._handle_deltas(deltas)
+
+ max_pos = deltas[-1]["stream_id"]
+ else:
+ room_deltas = {}
+ user_deltas = {}
+ max_pos = yield self.store.get_room_max_stream_ordering()
- logger.info("Handling %d state deltas", len(deltas))
- yield self._handle_deltas(deltas)
+ # Then count deltas for total_events and total_event_bytes.
+ room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
+ self.pos, max_pos
+ )
+
+ for room_id, fields in room_count.items():
+ room_deltas.setdefault(room_id, {}).update(fields)
+
+ for user_id, fields in user_count.items():
+ user_deltas.setdefault(user_id, {}).update(fields)
+
+ logger.debug("room_deltas: %s", room_deltas)
+ logger.debug("user_deltas: %s", user_deltas)
- self.pos = deltas[-1]["stream_id"]
- yield self.store.update_stats_stream_pos(self.pos)
+ # Always call this so that we update the stats position.
+ yield self.store.bulk_update_stats_delta(
+ self.clock.time_msec(),
+ updates={"room": room_deltas, "user": user_deltas},
+ stream_id=max_pos,
+ )
+
+ event_processing_positions.labels("stats").set(max_pos)
- event_processing_positions.labels("stats").set(self.pos)
+ if self.pos == max_pos:
+ break
+
+ self.pos = max_pos
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
+ """Called with the state deltas to process
+
+ Returns:
+ Deferred[tuple[dict[str, Counter], dict[str, counter]]]
+ Resovles to two dicts, the room deltas and the user deltas,
+ mapping from room/user ID to changes in the various fields.
"""
- Called with the state deltas to process
- """
+
+ room_to_stats_deltas = {}
+ user_to_stats_deltas = {}
+
+ room_to_state_updates = {}
+
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
@@ -115,11 +146,10 @@ class StatsHandler(StateDeltasHandler):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
- stream_pos = delta["stream_id"]
- logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+ logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
- token = yield self.store.get_earliest_token_for_room_stats(room_id)
+ token = yield self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@@ -131,203 +161,130 @@ class StatsHandler(StateDeltasHandler):
continue
if event_id is None and prev_event_id is None:
- # Errr...
+ logger.error(
+ "event ID is None and so is the previous event ID. stream_id: %s",
+ stream_id,
+ )
continue
event_content = {}
+ sender = None
if event_id is not None:
event = yield self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}
+ sender = event.sender
+
+ # All the values in this dict are deltas (RELATIVE changes)
+ room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
- # We use stream_pos here rather than fetch by event_id as event_id
- # may be None
- now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+ room_state = room_to_state_updates.setdefault(room_id, {})
- # quantise time to the nearest bucket
- now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+ if prev_event_id is None:
+ # this state event doesn't overwrite another,
+ # so it is a new effective/current state event
+ room_stats_delta["current_state_events"] += 1
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
- prev_event_content = {}
+ # We take None rather than leave as a previous membership
+ # in the absence of a previous event because we do not want to
+ # reduce the leave count when a new-to-the-room user joins.
+ prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
+ prev_membership = prev_event_content.get(
+ "membership", Membership.LEAVE
+ )
membership = event_content.get("membership", Membership.LEAVE)
- prev_membership = prev_event_content.get("membership", Membership.LEAVE)
-
- if prev_membership == membership:
- continue
- if prev_membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "joined_members", -1
- )
+ if prev_membership is None:
+ logger.debug("No previous membership for this user.")
+ elif membership == prev_membership:
+ pass # noop
+ elif prev_membership == Membership.JOIN:
+ room_stats_delta["joined_members"] -= 1
elif prev_membership == Membership.INVITE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "invited_members", -1
- )
+ room_stats_delta["invited_members"] -= 1
elif prev_membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "left_members", -1
- )
+ room_stats_delta["left_members"] -= 1
elif prev_membership == Membership.BAN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "banned_members", -1
- )
+ room_stats_delta["banned_members"] -= 1
else:
- err = "%s is not a valid prev_membership" % (repr(prev_membership),)
- logger.error(err)
- raise ValueError(err)
+ raise ValueError(
+ "%r is not a valid prev_membership" % (prev_membership,)
+ )
+ if membership == prev_membership:
+ pass # noop
if membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "joined_members", +1
- )
+ room_stats_delta["joined_members"] += 1
elif membership == Membership.INVITE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "invited_members", +1
- )
+ room_stats_delta["invited_members"] += 1
+
+ if sender and self.is_mine_id(sender):
+ user_to_stats_deltas.setdefault(sender, Counter())[
+ "invites_sent"
+ ] += 1
+
elif membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "left_members", +1
- )
+ room_stats_delta["left_members"] += 1
elif membership == Membership.BAN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "banned_members", +1
- )
+ room_stats_delta["banned_members"] += 1
else:
- err = "%s is not a valid membership" % (repr(membership),)
- logger.error(err)
- raise ValueError(err)
+ raise ValueError("%r is not a valid membership" % (membership,))
user_id = state_key
if self.is_mine_id(user_id):
- # update user_stats as it's one of our users
- public = yield self._is_public_room(room_id)
-
- if membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now,
- "user",
- user_id,
- "public_rooms" if public else "private_rooms",
- -1,
- )
- elif membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now,
- "user",
- user_id,
- "public_rooms" if public else "private_rooms",
- +1,
- )
+ # this accounts for transitions like leave ā ban and so on.
+ has_changed_joinedness = (prev_membership == Membership.JOIN) != (
+ membership == Membership.JOIN
+ )
- elif typ == EventTypes.Create:
- # Newly created room. Add it with all blank portions.
- yield self.store.update_room_state(
- room_id,
- {
- "join_rules": None,
- "history_visibility": None,
- "encryption": None,
- "name": None,
- "topic": None,
- "avatar": None,
- "canonical_alias": None,
- },
- )
+ if has_changed_joinedness:
+ delta = +1 if membership == Membership.JOIN else -1
- elif typ == EventTypes.JoinRules:
- yield self.store.update_room_state(
- room_id, {"join_rules": event_content.get("join_rule")}
- )
+ user_to_stats_deltas.setdefault(user_id, Counter())[
+ "joined_rooms"
+ ] += delta
- is_public = yield self._get_key_change(
- prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
- )
- if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
+ room_stats_delta["local_users_in_room"] += delta
+ elif typ == EventTypes.Create:
+ room_state["is_federatable"] = event_content.get("m.federate", True)
+ if sender and self.is_mine_id(sender):
+ user_to_stats_deltas.setdefault(sender, Counter())[
+ "rooms_created"
+ ] += 1
+ elif typ == EventTypes.JoinRules:
+ room_state["join_rules"] = event_content.get("join_rule")
elif typ == EventTypes.RoomHistoryVisibility:
- yield self.store.update_room_state(
- room_id,
- {"history_visibility": event_content.get("history_visibility")},
- )
-
- is_public = yield self._get_key_change(
- prev_event_id, event_id, "history_visibility", "world_readable"
+ room_state["history_visibility"] = event_content.get(
+ "history_visibility"
)
- if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
-
elif typ == EventTypes.Encryption:
- yield self.store.update_room_state(
- room_id, {"encryption": event_content.get("algorithm")}
- )
+ room_state["encryption"] = event_content.get("algorithm")
elif typ == EventTypes.Name:
- yield self.store.update_room_state(
- room_id, {"name": event_content.get("name")}
- )
+ room_state["name"] = event_content.get("name")
elif typ == EventTypes.Topic:
- yield self.store.update_room_state(
- room_id, {"topic": event_content.get("topic")}
- )
+ room_state["topic"] = event_content.get("topic")
elif typ == EventTypes.RoomAvatar:
- yield self.store.update_room_state(
- room_id, {"avatar": event_content.get("url")}
- )
+ room_state["avatar"] = event_content.get("url")
elif typ == EventTypes.CanonicalAlias:
- yield self.store.update_room_state(
- room_id, {"canonical_alias": event_content.get("alias")}
- )
+ room_state["canonical_alias"] = event_content.get("alias")
+ elif typ == EventTypes.GuestAccess:
+ room_state["guest_access"] = event_content.get("guest_access")
- @defer.inlineCallbacks
- def update_public_room_stats(self, ts, room_id, is_public):
- """
- Increment/decrement a user's number of public rooms when a room they are
- in changes to/from public visibility.
+ for room_id, state in room_to_state_updates.items():
+ yield self.store.update_room_state(room_id, state)
- Args:
- ts (int): Timestamp in seconds
- room_id (str)
- is_public (bool)
- """
- # For now, blindly iterate over all local users in the room so that
- # we can handle the whole problem of copying buckets over as needed
- user_ids = yield self.store.get_users_in_room(room_id)
-
- for user_id in user_ids:
- if self.hs.is_mine(UserID.from_string(user_id)):
- yield self.store.update_stats_delta(
- ts, "user", user_id, "public_rooms", +1 if is_public else -1
- )
- yield self.store.update_stats_delta(
- ts, "user", user_id, "private_rooms", -1 if is_public else +1
- )
-
- @defer.inlineCallbacks
- def _is_public_room(self, room_id):
- join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
- history_visibility = yield self.state.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility
- )
-
- if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
- (
- history_visibility
- and history_visibility.content.get("history_visibility")
- == "world_readable"
- )
- ):
- return True
- else:
- return False
+ return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ef7f2ca980..19bca6717f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -378,7 +378,7 @@ class SyncHandler(object):
event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
- return (now_token, ephemeral_by_room)
+ return now_token, ephemeral_by_room
@defer.inlineCallbacks
def _load_filtered_recents(
@@ -578,7 +578,6 @@ class SyncHandler(object):
if not last_events:
return None
- return
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
@@ -1332,7 +1331,7 @@ class SyncHandler(object):
)
if not tags_by_room:
logger.debug("no-oping sync")
- return ([], [], [], [])
+ return [], [], [], []
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id
@@ -1642,7 +1641,7 @@ class SyncHandler(object):
)
room_entries.append(entry)
- return (room_entries, invited, newly_joined_rooms, newly_left_rooms)
+ return room_entries, invited, newly_joined_rooms, newly_left_rooms
@defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1716,7 +1715,7 @@ class SyncHandler(object):
)
)
- return (room_entries, invited, [])
+ return room_entries, invited, []
@defer.inlineCallbacks
def _generate_room_entry(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index f882330293..ca8ae9fb5b 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -319,4 +319,4 @@ class TypingNotificationEventSource(object):
return self.get_typing_handler()._latest_room_serial
def get_pagination_rows(self, user, pagination_config, key):
- return ([], pagination_config.from_key)
+ return [], pagination_config.from_key
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 0ac20ebefc..0ae6db8ea7 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -35,7 +35,7 @@ from twisted.internet.interfaces import (
)
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, PartialDownloadError, readBody
+from twisted.web.client import Agent, HTTPConnectionPool, readBody
from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
@@ -599,38 +599,6 @@ def _readBodyToFile(response, stream, max_size):
return d
-class CaptchaServerHttpClient(SimpleHttpClient):
- """
- Separate HTTP client for talking to google's captcha servers
- Only slightly special because accepts partial download responses
-
- used only by c/s api v1
- """
-
- @defer.inlineCallbacks
- def post_urlencoded_get_raw(self, url, args={}):
- query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
-
- response = yield self.request(
- "POST",
- url,
- data=query_bytes,
- headers=Headers(
- {
- b"Content-Type": [b"application/x-www-form-urlencoded"],
- b"User-Agent": [self.user_agent],
- }
- ),
- )
-
- try:
- body = yield make_deferred_yieldable(readBody(response))
- return body
- except PartialDownloadError as e:
- # twisted dislikes google's response, no content length.
- return e.response
-
-
def encode_urlencode_args(args):
return {k: encode_urlencode_arg(v) for k, v in args.items()}
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 5e9b0befb0..7ddfad286d 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -207,7 +207,7 @@ class WellKnownResolver(object):
cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID,
)
- return (result, cache_period)
+ return result, cache_period
@defer.inlineCallbacks
def _make_well_known_request(self, server_name, retry):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index e6f351ba3b..cb9158fe1b 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -40,6 +40,7 @@ from synapse.api.errors import (
UnrecognizedRequestError,
)
from synapse.logging.context import preserve_fn
+from synapse.logging.opentracing import trace_servlet
from synapse.util.caches import intern_dict
logger = logging.getLogger(__name__)
@@ -257,7 +258,9 @@ class JsonResource(HttpServer, resource.Resource):
self.path_regexs = {}
self.hs = hs
- def register_paths(self, method, path_patterns, callback, servlet_classname):
+ def register_paths(
+ self, method, path_patterns, callback, servlet_classname, trace=True
+ ):
"""
Registers a request handler against a regular expression. Later request URLs are
checked against these regular expressions in order to identify an appropriate
@@ -273,8 +276,16 @@ class JsonResource(HttpServer, resource.Resource):
servlet_classname (str): The name of the handler to be used in prometheus
and opentracing logs.
+
+ trace (bool): Whether we should start a span to trace the servlet.
"""
method = method.encode("utf-8") # method is bytes on py3
+
+ if trace:
+ # We don't extract the context from the servlet because we can't
+ # trust the sender
+ callback = trace_servlet(servlet_classname)(callback)
+
for path_pattern in path_patterns:
logger.debug("Registering for %s %s", method, path_pattern.pattern)
self.path_regexs.setdefault(method, []).append(
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index c186b31f59..274c1a6a87 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -20,7 +20,6 @@ import logging
from canonicaljson import json
from synapse.api.errors import Codes, SynapseError
-from synapse.logging.opentracing import trace_servlet
logger = logging.getLogger(__name__)
@@ -298,10 +297,7 @@ class RestServlet(object):
servlet_classname = self.__class__.__name__
method_handler = getattr(self, "on_%s" % (method,))
http_server.register_paths(
- method,
- patterns,
- trace_servlet(servlet_classname)(method_handler),
- servlet_classname,
+ method, patterns, method_handler, servlet_classname
)
else:
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index dd296027a1..2c34b54702 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -85,14 +85,14 @@ the function becomes the operation name for the span.
return something_usual_and_useful
-Operation names can be explicitly set for functions by using
-``trace_using_operation_name``
+Operation names can be explicitly set for a function by passing the
+operation name to ``trace``
.. code-block:: python
- from synapse.logging.opentracing import trace_using_operation_name
+ from synapse.logging.opentracing import trace
- @trace_using_operation_name("A *much* better operation name")
+ @trace(opname="a_better_operation_name")
def interesting_badly_named_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
@@ -319,7 +319,7 @@ def whitelisted_homeserver(destination):
Args:
destination (str)
"""
- _homeserver_whitelist
+
if _homeserver_whitelist:
return _homeserver_whitelist.match(destination)
return False
@@ -493,6 +493,11 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T
Args:
headers (twisted.web.http_headers.Headers)
+ destination (str): address of entity receiving the span context. If check_destination
+ is true the context will only be injected if the destination matches the
+ opentracing whitelist
+ check_destination (bool): If false, destination will be ignored and the context
+ will always be injected.
span (opentracing.Span)
Returns:
@@ -525,6 +530,11 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
Args:
headers (dict)
+ destination (str): address of entity receiving the span context. If check_destination
+ is true the context will only be injected if the destination matches the
+ opentracing whitelist
+ check_destination (bool): If false, destination will be ignored and the context
+ will always be injected.
span (opentracing.Span)
Returns:
@@ -537,7 +547,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
here:
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
- if not whitelisted_homeserver(destination):
+ if check_destination and not whitelisted_homeserver(destination):
return
span = opentracing.tracer.active_span
@@ -556,9 +566,11 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
Args:
carrier (dict)
- destination (str): the name of the remote server. The span context
- will only be injected if the destination matches the homeserver_whitelist
- or destination is None.
+ destination (str): address of entity receiving the span context. If check_destination
+ is true the context will only be injected if the destination matches the
+ opentracing whitelist
+ check_destination (bool): If false, destination will be ignored and the context
+ will always be injected.
Returns:
In-place modification of carrier
@@ -641,66 +653,26 @@ def extract_text_map(carrier):
# Tracing decorators
-def trace(func):
+def trace(func=None, opname=None):
"""
Decorator to trace a function.
- Sets the operation name to that of the function's.
+ Sets the operation name to that of the function's or that given
+ as operation_name. See the module's doc string for usage
+ examples.
"""
- if opentracing is None:
- return func
- @wraps(func)
- def _trace_inner(self, *args, **kwargs):
- if opentracing is None:
- return func(self, *args, **kwargs)
-
- scope = start_active_span(func.__name__)
- scope.__enter__()
-
- try:
- result = func(self, *args, **kwargs)
- if isinstance(result, defer.Deferred):
-
- def call_back(result):
- scope.__exit__(None, None, None)
- return result
-
- def err_back(result):
- scope.span.set_tag(tags.ERROR, True)
- scope.__exit__(None, None, None)
- return result
-
- result.addCallbacks(call_back, err_back)
-
- else:
- scope.__exit__(None, None, None)
-
- return result
-
- except Exception as e:
- scope.__exit__(type(e), None, e.__traceback__)
- raise
-
- return _trace_inner
-
-
-def trace_using_operation_name(operation_name):
- """Decorator to trace a function. Explicitely sets the operation_name."""
-
- def trace(func):
- """
- Decorator to trace a function.
- Sets the operation name to that of the function's.
- """
+ def decorator(func):
if opentracing is None:
return func
+ _opname = opname if opname else func.__name__
+
@wraps(func)
def _trace_inner(self, *args, **kwargs):
if opentracing is None:
return func(self, *args, **kwargs)
- scope = start_active_span(operation_name)
+ scope = start_active_span(_opname)
scope.__enter__()
try:
@@ -717,6 +689,7 @@ def trace_using_operation_name(operation_name):
return result
result.addCallbacks(call_back, err_back)
+
else:
scope.__exit__(None, None, None)
@@ -728,7 +701,10 @@ def trace_using_operation_name(operation_name):
return _trace_inner
- return trace
+ if func:
+ return decorator(func)
+ else:
+ return decorator
def tag_args(func):
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 41147d4292..735b882363 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -101,7 +101,7 @@ class ModuleApi(object):
)
user_id = yield self.register_user(localpart, displayname, emails)
_, access_token = yield self.register_device(user_id)
- return (user_id, access_token)
+ return user_id, access_token
def register_user(self, localpart, displayname=None, emails=[]):
"""Registers a new user with given localpart and optional displayname, emails.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index bd80c801b6..4e091314e6 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -472,11 +472,11 @@ class Notifier(object):
joined_room_ids = yield self.store.get_rooms_for_user(user.to_string())
if explicit_room_id:
if explicit_room_id in joined_room_ids:
- return ([explicit_room_id], True)
+ return [explicit_room_id], True
if (yield self._is_world_readable(explicit_room_id)):
- return ([explicit_room_id], False)
+ return [explicit_room_id], False
raise AuthError(403, "Non-joined access not allowed")
- return (joined_room_ids, True)
+ return joined_room_ids, True
@defer.inlineCallbacks
def _is_world_readable(self, room_id):
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index c831975635..22491f3700 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -134,7 +134,7 @@ class BulkPushRuleEvaluator(object):
pl_event = auth_events.get(POWER_KEY)
- return (pl_event.content if pl_event else {}, sender_level)
+ return pl_event.content if pl_event else {}, sender_level
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index c4be9273f6..afc9a8ff29 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -22,13 +22,13 @@ from six.moves import urllib
from twisted.internet import defer
-import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
RequestSendFailed,
SynapseError,
)
+from synapse.logging.opentracing import inject_active_span_byte_dict, trace_servlet
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
@@ -167,9 +167,7 @@ class ReplicationEndpoint(object):
# the master, and so whether we should clean up or not.
while True:
headers = {}
- opentracing.inject_active_span_byte_dict(
- headers, None, check_destination=False
- )
+ inject_active_span_byte_dict(headers, None, check_destination=False)
try:
result = yield request_func(uri, data, headers=headers)
break
@@ -210,13 +208,11 @@ class ReplicationEndpoint(object):
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
+ handler = trace_servlet(self.__class__.__name__, extract_context=True)(handler)
+ # We don't let register paths trace this servlet using the default tracing
+ # options because we wish to extract the context explicitly.
http_server.register_paths(
- method,
- [pattern],
- opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
- handler
- ),
- self.__class__.__name__,
+ method, [pattern], handler, self.__class__.__name__, trace=False
)
def _cached_handler(self, request, txn_id, **kwargs):
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index fed4f08820..2f16955954 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -113,7 +113,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_and_contexts, backfilled
)
- return (200, {})
+ return 200, {}
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
@@ -156,7 +156,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
result = yield self.registry.on_edu(edu_type, origin, edu_content)
- return (200, result)
+ return 200, result
class ReplicationGetQueryRestServlet(ReplicationEndpoint):
@@ -204,7 +204,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
result = yield self.registry.on_query(query_type, args)
- return (200, result)
+ return 200, result
class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
@@ -238,7 +238,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
def _handle_request(self, request, room_id):
yield self.store.clean_room_for_join(room_id)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index f17d3a2da4..786f5232b2 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -64,7 +64,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
user_id, device_id, initial_display_name, is_guest
)
- return (200, {"device_id": device_id, "access_token": access_token})
+ return 200, {"device_id": device_id, "access_token": access_token}
def register_servlets(hs, http_server):
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 4217335d88..b9ce3477ad 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -83,7 +83,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
remote_room_hosts, room_id, user_id, event_content
)
- return (200, {})
+ return 200, {}
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
@@ -153,7 +153,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
yield self.store.locally_reject_invite(user_id, room_id)
ret = {}
- return (200, ret)
+ return 200, ret
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
@@ -202,7 +202,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
else:
raise Exception("Unrecognized change: %r", change)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 3341320a87..38260256cf 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -90,7 +90,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
address=content["address"],
)
- return (200, {})
+ return 200, {}
class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
@@ -106,7 +106,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
self.registration_handler = hs.get_registration_handler()
@staticmethod
- def _serialize_payload(user_id, auth_result, access_token, bind_email, bind_msisdn):
+ def _serialize_payload(user_id, auth_result, access_token):
"""
Args:
user_id (str): The user ID that consented
@@ -114,17 +114,8 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
registered user.
access_token (str|None): The access token of the newly logged in
device, or None if `inhibit_login` enabled.
- bind_email (bool): Whether to bind the email with the identity
- server
- bind_msisdn (bool): Whether to bind the msisdn with the identity
- server
"""
- return {
- "auth_result": auth_result,
- "access_token": access_token,
- "bind_email": bind_email,
- "bind_msisdn": bind_msisdn,
- }
+ return {"auth_result": auth_result, "access_token": access_token}
@defer.inlineCallbacks
def _handle_request(self, request, user_id):
@@ -132,18 +123,12 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
auth_result = content["auth_result"]
access_token = content["access_token"]
- bind_email = content["bind_email"]
- bind_msisdn = content["bind_msisdn"]
yield self.registration_handler.post_registration_actions(
- user_id=user_id,
- auth_result=auth_result,
- access_token=access_token,
- bind_email=bind_email,
- bind_msisdn=bind_msisdn,
+ user_id=user_id, auth_result=auth_result, access_token=access_token
)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index eff7bd7305..adb9b2f7f4 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -117,7 +117,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index c10b85d2ff..f03111c259 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -158,7 +158,7 @@ class Stream(object):
updates, current_token = yield self.get_updates_since(self.last_token)
self.last_token = current_token
- return (updates, current_token)
+ return updates, current_token
@defer.inlineCallbacks
def get_updates_since(self, from_token):
@@ -172,14 +172,14 @@ class Stream(object):
sent over the replication steam.
"""
if from_token in ("NOW", "now"):
- return ([], self.upto_token)
+ return [], self.upto_token
current_token = self.upto_token
from_token = int(from_token)
if from_token == current_token:
- return ([], current_token)
+ return [], current_token
if self._LIMITED:
rows = yield self.update_function(
@@ -198,7 +198,7 @@ class Stream(object):
if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behind" % (self.NAME))
- return (updates, current_token)
+ return updates, current_token
def current_token(self):
"""Gets the current token of the underlying streams. Should be provided
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index fa91cc8dee..81b6bd8816 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -41,7 +41,7 @@ from synapse.rest.admin._base import (
assert_user_is_admin,
historical_admin_path_patterns,
)
-from synapse.rest.admin.media import register_servlets_for_media_repo
+from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.users import UserAdminServlet
@@ -69,7 +69,7 @@ class UsersRestServlet(RestServlet):
ret = yield self.handlers.admin_handler.get_users()
- return (200, ret)
+ return 200, ret
class VersionServlet(RestServlet):
@@ -120,7 +120,7 @@ class UserRegisterServlet(RestServlet):
nonce = self.hs.get_secrets().token_hex(64)
self.nonces[nonce] = int(self.reactor.seconds())
- return (200, {"nonce": nonce})
+ return 200, {"nonce": nonce}
@defer.inlineCallbacks
def on_POST(self, request):
@@ -212,7 +212,7 @@ class UserRegisterServlet(RestServlet):
)
result = yield register._create_registration_details(user_id, body)
- return (200, result)
+ return 200, result
class WhoisRestServlet(RestServlet):
@@ -237,7 +237,7 @@ class WhoisRestServlet(RestServlet):
ret = yield self.handlers.admin_handler.get_whois(target_user)
- return (200, ret)
+ return 200, ret
class PurgeHistoryRestServlet(RestServlet):
@@ -322,7 +322,7 @@ class PurgeHistoryRestServlet(RestServlet):
room_id, token, delete_local_events=delete_local_events
)
- return (200, {"purge_id": purge_id})
+ return 200, {"purge_id": purge_id}
class PurgeHistoryStatusRestServlet(RestServlet):
@@ -347,7 +347,7 @@ class PurgeHistoryStatusRestServlet(RestServlet):
if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id)
- return (200, purge_status.asdict())
+ return 200, purge_status.asdict()
class DeactivateAccountRestServlet(RestServlet):
@@ -379,7 +379,7 @@ class DeactivateAccountRestServlet(RestServlet):
else:
id_server_unbind_result = "no-support"
- return (200, {"id_server_unbind_result": id_server_unbind_result})
+ return 200, {"id_server_unbind_result": id_server_unbind_result}
class ShutdownRoomRestServlet(RestServlet):
@@ -549,7 +549,7 @@ class ResetPasswordRestServlet(RestServlet):
yield self._set_password_handler.set_password(
target_user_id, new_password, requester
)
- return (200, {})
+ return 200, {}
class GetUsersPaginatedRestServlet(RestServlet):
@@ -591,7 +591,7 @@ class GetUsersPaginatedRestServlet(RestServlet):
logger.info("limit: %s, start: %s", limit, start)
ret = yield self.handlers.admin_handler.get_users_paginate(order, start, limit)
- return (200, ret)
+ return 200, ret
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
@@ -619,7 +619,7 @@ class GetUsersPaginatedRestServlet(RestServlet):
logger.info("limit: %s, start: %s", limit, start)
ret = yield self.handlers.admin_handler.get_users_paginate(order, start, limit)
- return (200, ret)
+ return 200, ret
class SearchUsersRestServlet(RestServlet):
@@ -662,7 +662,7 @@ class SearchUsersRestServlet(RestServlet):
logger.info("term: %s ", term)
ret = yield self.handlers.admin_handler.search_users(term)
- return (200, ret)
+ return 200, ret
class DeleteGroupAdminRestServlet(RestServlet):
@@ -685,7 +685,7 @@ class DeleteGroupAdminRestServlet(RestServlet):
raise SynapseError(400, "Can only delete local groups")
yield self.group_server.delete_group(group_id, requester.user.to_string())
- return (200, {})
+ return 200, {}
class AccountValidityRenewServlet(RestServlet):
@@ -716,7 +716,7 @@ class AccountValidityRenewServlet(RestServlet):
)
res = {"expiration_ts": expiration_ts}
- return (200, res)
+ return 200, res
########################################################################################
@@ -761,9 +761,12 @@ def register_servlets_for_client_rest_resource(hs, http_server):
DeleteGroupAdminRestServlet(hs).register(http_server)
AccountValidityRenewServlet(hs).register(http_server)
- # Load the media repo ones if we're using them.
+ # Load the media repo ones if we're using them. Otherwise load the servlets which
+ # don't need a media repo (typically readonly admin APIs).
if hs.config.can_load_media_repo:
register_servlets_for_media_repo(hs, http_server)
+ else:
+ ListMediaInRoom(hs).register(http_server)
# don't add more things here: new servlets should only be exposed on
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
index 824df919f2..ed7086d09c 100644
--- a/synapse/rest/admin/media.py
+++ b/synapse/rest/admin/media.py
@@ -49,7 +49,7 @@ class QuarantineMediaInRoom(RestServlet):
room_id, requester.user.to_string()
)
- return (200, {"num_quarantined": num_quarantined})
+ return 200, {"num_quarantined": num_quarantined}
class ListMediaInRoom(RestServlet):
@@ -60,6 +60,7 @@ class ListMediaInRoom(RestServlet):
def __init__(self, hs):
self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@@ -70,7 +71,7 @@ class ListMediaInRoom(RestServlet):
local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
- return (200, {"local": local_mxcs, "remote": remote_mxcs})
+ return 200, {"local": local_mxcs, "remote": remote_mxcs}
class PurgeMediaCacheRestServlet(RestServlet):
@@ -89,7 +90,7 @@ class PurgeMediaCacheRestServlet(RestServlet):
ret = yield self.media_repository.delete_old_remote_media(before_ts)
- return (200, ret)
+ return 200, ret
def register_servlets_for_media_repo(hs, http_server):
diff --git a/synapse/rest/admin/purge_room_servlet.py b/synapse/rest/admin/purge_room_servlet.py
index 2922eb543e..f474066542 100644
--- a/synapse/rest/admin/purge_room_servlet.py
+++ b/synapse/rest/admin/purge_room_servlet.py
@@ -54,4 +54,4 @@ class PurgeRoomServlet(RestServlet):
await self.pagination_handler.purge_room(body["room_id"])
- return (200, {})
+ return 200, {}
diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py
index 656526fea5..ae2cbe2e0a 100644
--- a/synapse/rest/admin/server_notice_servlet.py
+++ b/synapse/rest/admin/server_notice_servlet.py
@@ -92,7 +92,7 @@ class SendServerNoticeServlet(RestServlet):
event_content=body["content"],
)
- return (200, {"event_id": event.event_id})
+ return 200, {"event_id": event.event_id}
def on_PUT(self, request, txn_id):
return self.txns.fetch_or_execute_request(
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 5364117420..9720a3bab0 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -71,7 +71,7 @@ class UserAdminServlet(RestServlet):
is_admin = yield self.handlers.admin_handler.get_user_server_admin(target_user)
is_admin = bool(is_admin)
- return (200, {"admin": is_admin})
+ return 200, {"admin": is_admin}
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
@@ -97,4 +97,4 @@ class UserAdminServlet(RestServlet):
target_user, set_admin_to
)
- return (200, {})
+ return 200, {}
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 4284738021..4ea3666874 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -54,7 +54,7 @@ class ClientDirectoryServer(RestServlet):
dir_handler = self.handlers.directory_handler
res = yield dir_handler.get_association(room_alias)
- return (200, res)
+ return 200, res
@defer.inlineCallbacks
def on_PUT(self, request, room_alias):
@@ -87,7 +87,7 @@ class ClientDirectoryServer(RestServlet):
requester, room_alias, room_id, servers
)
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_DELETE(self, request, room_alias):
@@ -102,7 +102,7 @@ class ClientDirectoryServer(RestServlet):
service.url,
room_alias.to_string(),
)
- return (200, {})
+ return 200, {}
except InvalidClientCredentialsError:
# fallback to default user behaviour if they aren't an AS
pass
@@ -118,7 +118,7 @@ class ClientDirectoryServer(RestServlet):
"User %s deleted alias %s", user.to_string(), room_alias.to_string()
)
- return (200, {})
+ return 200, {}
class ClientDirectoryListServer(RestServlet):
@@ -136,7 +136,7 @@ class ClientDirectoryListServer(RestServlet):
if room is None:
raise NotFoundError("Unknown room")
- return (200, {"visibility": "public" if room["is_public"] else "private"})
+ return 200, {"visibility": "public" if room["is_public"] else "private"}
@defer.inlineCallbacks
def on_PUT(self, request, room_id):
@@ -149,7 +149,7 @@ class ClientDirectoryListServer(RestServlet):
requester, room_id, visibility
)
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_DELETE(self, request, room_id):
@@ -159,7 +159,7 @@ class ClientDirectoryListServer(RestServlet):
requester, room_id, "private"
)
- return (200, {})
+ return 200, {}
class ClientAppserviceDirectoryListServer(RestServlet):
@@ -193,4 +193,4 @@ class ClientAppserviceDirectoryListServer(RestServlet):
requester.app_service.id, network_id, room_id, visibility
)
- return (200, {})
+ return 200, {}
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index 53ebed2203..6651b4cf07 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -67,10 +67,10 @@ class EventStreamRestServlet(RestServlet):
is_guest=is_guest,
)
- return (200, chunk)
+ return 200, chunk
def on_OPTIONS(self, request):
- return (200, {})
+ return 200, {}
# TODO: Unit test gets, with and without auth, with different kinds of events.
@@ -91,9 +91,9 @@ class EventRestServlet(RestServlet):
time_now = self.clock.time_msec()
if event:
event = yield self._event_serializer.serialize_event(event, time_now)
- return (200, event)
+ return 200, event
else:
- return (404, "Event not found.")
+ return 404, "Event not found."
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py
index 70b8478e90..2da3cd7511 100644
--- a/synapse/rest/client/v1/initial_sync.py
+++ b/synapse/rest/client/v1/initial_sync.py
@@ -42,7 +42,7 @@ class InitialSyncRestServlet(RestServlet):
include_archived=include_archived,
)
- return (200, content)
+ return 200, content
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 5762b9fd06..25a1b67092 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -121,10 +121,10 @@ class LoginRestServlet(RestServlet):
({"type": t} for t in self.auth_handler.get_supported_login_types())
)
- return (200, {"flows": flows})
+ return 200, {"flows": flows}
def on_OPTIONS(self, request):
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_POST(self, request):
@@ -152,7 +152,7 @@ class LoginRestServlet(RestServlet):
well_known_data = self._well_known_builder.get_well_known()
if well_known_data:
result["well_known"] = well_known_data
- return (200, result)
+ return 200, result
@defer.inlineCallbacks
def _do_other_login(self, login_submission):
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index 2769f3a189..4785a34d75 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -33,7 +33,7 @@ class LogoutRestServlet(RestServlet):
self._device_handler = hs.get_device_handler()
def on_OPTIONS(self, request):
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_POST(self, request):
@@ -49,7 +49,7 @@ class LogoutRestServlet(RestServlet):
requester.user.to_string(), requester.device_id
)
- return (200, {})
+ return 200, {}
class LogoutAllRestServlet(RestServlet):
@@ -62,7 +62,7 @@ class LogoutAllRestServlet(RestServlet):
self._device_handler = hs.get_device_handler()
def on_OPTIONS(self, request):
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_POST(self, request):
@@ -75,7 +75,7 @@ class LogoutAllRestServlet(RestServlet):
# .. and then delete any access tokens which weren't associated with
# devices.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index 1eb1068c98..0153525cef 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -56,7 +56,7 @@ class PresenceStatusRestServlet(RestServlet):
state = yield self.presence_handler.get_state(target_user=user)
state = format_user_presence_state(state, self.clock.time_msec())
- return (200, state)
+ return 200, state
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
@@ -88,10 +88,10 @@ class PresenceStatusRestServlet(RestServlet):
if self.hs.config.use_presence:
yield self.presence_handler.set_state(user, state)
- return (200, {})
+ return 200, {}
def on_OPTIONS(self, request):
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index 2657ae45bb..bbce2e2b71 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -48,7 +48,7 @@ class ProfileDisplaynameRestServlet(RestServlet):
if displayname is not None:
ret["displayname"] = displayname
- return (200, ret)
+ return 200, ret
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
@@ -61,14 +61,14 @@ class ProfileDisplaynameRestServlet(RestServlet):
try:
new_name = content["displayname"]
except Exception:
- return (400, "Unable to parse name")
+ return 400, "Unable to parse name"
yield self.profile_handler.set_displayname(user, requester, new_name, is_admin)
- return (200, {})
+ return 200, {}
def on_OPTIONS(self, request, user_id):
- return (200, {})
+ return 200, {}
class ProfileAvatarURLRestServlet(RestServlet):
@@ -98,7 +98,7 @@ class ProfileAvatarURLRestServlet(RestServlet):
if avatar_url is not None:
ret["avatar_url"] = avatar_url
- return (200, ret)
+ return 200, ret
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
@@ -110,14 +110,14 @@ class ProfileAvatarURLRestServlet(RestServlet):
try:
new_name = content["avatar_url"]
except Exception:
- return (400, "Unable to parse name")
+ return 400, "Unable to parse name"
yield self.profile_handler.set_avatar_url(user, requester, new_name, is_admin)
- return (200, {})
+ return 200, {}
def on_OPTIONS(self, request, user_id):
- return (200, {})
+ return 200, {}
class ProfileRestServlet(RestServlet):
@@ -150,7 +150,7 @@ class ProfileRestServlet(RestServlet):
if avatar_url is not None:
ret["avatar_url"] = avatar_url
- return (200, ret)
+ return 200, ret
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index c3ae8b98a8..9f8c3d09e3 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -69,7 +69,7 @@ class PushRuleRestServlet(RestServlet):
if "attr" in spec:
yield self.set_rule_attr(user_id, spec, content)
self.notify_user(user_id)
- return (200, {})
+ return 200, {}
if spec["rule_id"].startswith("."):
# Rule ids starting with '.' are reserved for server default rules.
@@ -106,7 +106,7 @@ class PushRuleRestServlet(RestServlet):
except RuleNotFoundException as e:
raise SynapseError(400, str(e))
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_DELETE(self, request, path):
@@ -123,7 +123,7 @@ class PushRuleRestServlet(RestServlet):
try:
yield self.store.delete_push_rule(user_id, namespaced_rule_id)
self.notify_user(user_id)
- return (200, {})
+ return 200, {}
except StoreError as e:
if e.code == 404:
raise NotFoundError()
@@ -151,10 +151,10 @@ class PushRuleRestServlet(RestServlet):
)
if path[0] == "":
- return (200, rules)
+ return 200, rules
elif path[0] == "global":
result = _filter_ruleset_with_path(rules["global"], path[1:])
- return (200, result)
+ return 200, result
else:
raise UnrecognizedRequestError()
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index ebc3dec516..41660682d9 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -62,7 +62,7 @@ class PushersRestServlet(RestServlet):
if k not in allowed_keys:
del p[k]
- return (200, {"pushers": pushers})
+ return 200, {"pushers": pushers}
def on_OPTIONS(self, _):
return 200, {}
@@ -94,7 +94,7 @@ class PushersSetRestServlet(RestServlet):
yield self.pusher_pool.remove_pusher(
content["app_id"], content["pushkey"], user_id=user.to_string()
)
- return (200, {})
+ return 200, {}
assert_params_in_dict(
content,
@@ -143,7 +143,7 @@ class PushersSetRestServlet(RestServlet):
self.notifier.on_new_replication_data()
- return (200, {})
+ return 200, {}
def on_OPTIONS(self, _):
return 200, {}
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 4b2344e696..3582259026 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -91,14 +91,14 @@ class RoomCreateRestServlet(TransactionRestServlet):
requester, self.get_room_config(request)
)
- return (200, info)
+ return 200, info
def get_room_config(self, request):
user_supplied_config = parse_json_object_from_request(request)
return user_supplied_config
def on_OPTIONS(self, request):
- return (200, {})
+ return 200, {}
# TODO: Needs unit testing for generic events
@@ -173,9 +173,9 @@ class RoomStateEventRestServlet(TransactionRestServlet):
if format == "event":
event = format_event_for_client_v2(data.get_dict())
- return (200, event)
+ return 200, event
elif format == "content":
- return (200, data.get_dict()["content"])
+ return 200, data.get_dict()["content"]
@defer.inlineCallbacks
def on_PUT(self, request, room_id, event_type, state_key, txn_id=None):
@@ -210,7 +210,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
ret = {}
if event:
ret = {"event_id": event.event_id}
- return (200, ret)
+ return 200, ret
# TODO: Needs unit testing for generic events + feedback
@@ -244,10 +244,10 @@ class RoomSendEventRestServlet(TransactionRestServlet):
requester, event_dict, txn_id=txn_id
)
- return (200, {"event_id": event.event_id})
+ return 200, {"event_id": event.event_id}
def on_GET(self, request, room_id, event_type, txn_id):
- return (200, "Not implemented")
+ return 200, "Not implemented"
def on_PUT(self, request, room_id, event_type, txn_id):
return self.txns.fetch_or_execute_request(
@@ -307,7 +307,7 @@ class JoinRoomAliasServlet(TransactionRestServlet):
third_party_signed=content.get("third_party_signed", None),
)
- return (200, {"room_id": room_id})
+ return 200, {"room_id": room_id}
def on_PUT(self, request, room_identifier, txn_id):
return self.txns.fetch_or_execute_request(
@@ -360,7 +360,7 @@ class PublicRoomListRestServlet(TransactionRestServlet):
limit=limit, since_token=since_token
)
- return (200, data)
+ return 200, data
@defer.inlineCallbacks
def on_POST(self, request):
@@ -405,7 +405,7 @@ class PublicRoomListRestServlet(TransactionRestServlet):
network_tuple=network_tuple,
)
- return (200, data)
+ return 200, data
# TODO: Needs unit testing
@@ -456,7 +456,7 @@ class RoomMemberListRestServlet(RestServlet):
continue
chunk.append(event)
- return (200, {"chunk": chunk})
+ return 200, {"chunk": chunk}
# deprecated in favour of /members?membership=join?
@@ -477,7 +477,7 @@ class JoinedRoomMemberListRestServlet(RestServlet):
requester, room_id
)
- return (200, {"joined": users_with_profile})
+ return 200, {"joined": users_with_profile}
# TODO: Needs better unit testing
@@ -510,7 +510,7 @@ class RoomMessageListRestServlet(RestServlet):
event_filter=event_filter,
)
- return (200, msgs)
+ return 200, msgs
# TODO: Needs unit testing
@@ -531,7 +531,7 @@ class RoomStateRestServlet(RestServlet):
user_id=requester.user.to_string(),
is_guest=requester.is_guest,
)
- return (200, events)
+ return 200, events
# TODO: Needs unit testing
@@ -550,7 +550,7 @@ class RoomInitialSyncRestServlet(RestServlet):
content = yield self.initial_sync_handler.room_initial_sync(
room_id=room_id, requester=requester, pagin_config=pagination_config
)
- return (200, content)
+ return 200, content
class RoomEventServlet(RestServlet):
@@ -581,7 +581,7 @@ class RoomEventServlet(RestServlet):
time_now = self.clock.time_msec()
if event:
event = yield self._event_serializer.serialize_event(event, time_now)
- return (200, event)
+ return 200, event
return SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
@@ -633,7 +633,7 @@ class RoomEventContextServlet(RestServlet):
results["state"], time_now
)
- return (200, results)
+ return 200, results
class RoomForgetRestServlet(TransactionRestServlet):
@@ -652,7 +652,7 @@ class RoomForgetRestServlet(TransactionRestServlet):
yield self.room_member_handler.forget(user=requester.user, room_id=room_id)
- return (200, {})
+ return 200, {}
def on_PUT(self, request, room_id, txn_id):
return self.txns.fetch_or_execute_request(
@@ -702,8 +702,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
requester,
txn_id,
)
- return (200, {})
- return
+ return 200, {}
target = requester.user
if membership_action in ["invite", "ban", "unban", "kick"]:
@@ -729,7 +728,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
if membership_action == "join":
return_value["room_id"] = room_id
- return (200, return_value)
+ return 200, return_value
def _has_3pid_invite_keys(self, content):
for key in {"id_server", "medium", "address"}:
@@ -771,7 +770,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
txn_id=txn_id,
)
- return (200, {"event_id": event.event_id})
+ return 200, {"event_id": event.event_id}
def on_PUT(self, request, room_id, event_id, txn_id):
return self.txns.fetch_or_execute_request(
@@ -816,7 +815,7 @@ class RoomTypingRestServlet(RestServlet):
target_user=target_user, auth_user=requester.user, room_id=room_id
)
- return (200, {})
+ return 200, {}
class SearchRestServlet(RestServlet):
@@ -838,7 +837,7 @@ class SearchRestServlet(RestServlet):
requester.user, content, batch
)
- return (200, results)
+ return 200, results
class JoinedRoomsRestServlet(RestServlet):
@@ -854,7 +853,7 @@ class JoinedRoomsRestServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
room_ids = yield self.store.get_rooms_for_user(requester.user.to_string())
- return (200, {"joined_rooms": list(room_ids)})
+ return 200, {"joined_rooms": list(room_ids)}
def register_txn_path(servlet, regex_string, http_server, with_get=False):
diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py
index 497cddf8b8..2afdbb89e5 100644
--- a/synapse/rest/client/v1/voip.py
+++ b/synapse/rest/client/v1/voip.py
@@ -60,7 +60,7 @@ class VoipRestServlet(RestServlet):
password = turnPassword
else:
- return (200, {})
+ return 200, {}
return (
200,
@@ -73,7 +73,7 @@ class VoipRestServlet(RestServlet):
)
def on_OPTIONS(self, request):
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 934ed5d16d..e9cc953bdd 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -117,7 +117,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
# Wrap the session id in a JSON object
ret = {"sid": sid}
- return (200, ret)
+ return 200, ret
@defer.inlineCallbacks
def send_password_reset(self, email, client_secret, send_attempt, next_link=None):
@@ -221,7 +221,7 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
raise SynapseError(400, "MSISDN not found", Codes.THREEPID_NOT_FOUND)
ret = yield self.identity_handler.requestMsisdnToken(**body)
- return (200, ret)
+ return 200, ret
class PasswordResetSubmitTokenServlet(RestServlet):
@@ -330,7 +330,7 @@ class PasswordResetSubmitTokenServlet(RestServlet):
)
response_code = 200 if valid else 400
- return (response_code, {"success": valid})
+ return response_code, {"success": valid}
class PasswordRestServlet(RestServlet):
@@ -399,7 +399,7 @@ class PasswordRestServlet(RestServlet):
yield self._set_password_handler.set_password(user_id, new_password, requester)
- return (200, {})
+ return 200, {}
def on_OPTIONS(self, _):
return 200, {}
@@ -434,7 +434,7 @@ class DeactivateAccountRestServlet(RestServlet):
yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string(), erase
)
- return (200, {})
+ return 200, {}
yield self.auth_handler.validate_user_via_ui_auth(
requester, body, self.hs.get_ip_from_request(request)
@@ -447,7 +447,7 @@ class DeactivateAccountRestServlet(RestServlet):
else:
id_server_unbind_result = "no-support"
- return (200, {"id_server_unbind_result": id_server_unbind_result})
+ return 200, {"id_server_unbind_result": id_server_unbind_result}
class EmailThreepidRequestTokenRestServlet(RestServlet):
@@ -481,7 +481,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
ret = yield self.identity_handler.requestEmailToken(**body)
- return (200, ret)
+ return 200, ret
class MsisdnThreepidRequestTokenRestServlet(RestServlet):
@@ -516,7 +516,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
raise SynapseError(400, "MSISDN is already in use", Codes.THREEPID_IN_USE)
ret = yield self.identity_handler.requestMsisdnToken(**body)
- return (200, ret)
+ return 200, ret
class ThreepidRestServlet(RestServlet):
@@ -536,21 +536,22 @@ class ThreepidRestServlet(RestServlet):
threepids = yield self.datastore.user_get_threepids(requester.user.to_string())
- return (200, {"threepids": threepids})
+ return 200, {"threepids": threepids}
@defer.inlineCallbacks
def on_POST(self, request):
body = parse_json_object_from_request(request)
- threePidCreds = body.get("threePidCreds")
- threePidCreds = body.get("three_pid_creds", threePidCreds)
- if threePidCreds is None:
- raise SynapseError(400, "Missing param", Codes.MISSING_PARAM)
+ threepid_creds = body.get("threePidCreds") or body.get("three_pid_creds")
+ if threepid_creds is None:
+ raise SynapseError(
+ 400, "Missing param three_pid_creds", Codes.MISSING_PARAM
+ )
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
- threepid = yield self.identity_handler.threepid_from_creds(threePidCreds)
+ threepid = yield self.identity_handler.threepid_from_creds(threepid_creds)
if not threepid:
raise SynapseError(400, "Failed to auth 3pid", Codes.THREEPID_AUTH_FAILED)
@@ -566,9 +567,41 @@ class ThreepidRestServlet(RestServlet):
if "bind" in body and body["bind"]:
logger.debug("Binding threepid %s to %s", threepid, user_id)
- yield self.identity_handler.bind_threepid(threePidCreds, user_id)
+ yield self.identity_handler.bind_threepid(threepid_creds, user_id)
+
+ return 200, {}
+
+
+class ThreepidUnbindRestServlet(RestServlet):
+ PATTERNS = client_patterns("/account/3pid/unbind$")
+
+ def __init__(self, hs):
+ super(ThreepidUnbindRestServlet, self).__init__()
+ self.hs = hs
+ self.identity_handler = hs.get_handlers().identity_handler
+ self.auth = hs.get_auth()
+ self.datastore = self.hs.get_datastore()
- return (200, {})
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ """Unbind the given 3pid from a specific identity server, or identity servers that are
+ known to have this 3pid bound
+ """
+ requester = yield self.auth.get_user_by_req(request)
+ body = parse_json_object_from_request(request)
+ assert_params_in_dict(body, ["medium", "address"])
+
+ medium = body.get("medium")
+ address = body.get("address")
+ id_server = body.get("id_server")
+
+ # Attempt to unbind the threepid from an identity server. If id_server is None, try to
+ # unbind from all identity servers this threepid has been added to in the past
+ result = yield self.identity_handler.try_unbind_threepid(
+ requester.user.to_string(),
+ {"address": address, "medium": medium, "id_server": id_server},
+ )
+ return 200, {"id_server_unbind_result": "success" if result else "no-support"}
class ThreepidDeleteRestServlet(RestServlet):
@@ -603,7 +636,7 @@ class ThreepidDeleteRestServlet(RestServlet):
else:
id_server_unbind_result = "no-support"
- return (200, {"id_server_unbind_result": id_server_unbind_result})
+ return 200, {"id_server_unbind_result": id_server_unbind_result}
class WhoamiRestServlet(RestServlet):
@@ -617,7 +650,7 @@ class WhoamiRestServlet(RestServlet):
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request)
- return (200, {"user_id": requester.user.to_string()})
+ return 200, {"user_id": requester.user.to_string()}
def register_servlets(hs, http_server):
@@ -629,5 +662,6 @@ def register_servlets(hs, http_server):
EmailThreepidRequestTokenRestServlet(hs).register(http_server)
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
+ ThreepidUnbindRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py
index 98f2f6f4b5..f0db204ffa 100644
--- a/synapse/rest/client/v2_alpha/account_data.py
+++ b/synapse/rest/client/v2_alpha/account_data.py
@@ -55,7 +55,7 @@ class AccountDataServlet(RestServlet):
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_GET(self, request, user_id, account_data_type):
@@ -70,7 +70,7 @@ class AccountDataServlet(RestServlet):
if event is None:
raise NotFoundError("Account data not found")
- return (200, event)
+ return 200, event
class RoomAccountDataServlet(RestServlet):
@@ -112,7 +112,7 @@ class RoomAccountDataServlet(RestServlet):
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_GET(self, request, user_id, room_id, account_data_type):
@@ -127,7 +127,7 @@ class RoomAccountDataServlet(RestServlet):
if event is None:
raise NotFoundError("Room account data not found")
- return (200, event)
+ return 200, event
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/capabilities.py b/synapse/rest/client/v2_alpha/capabilities.py
index a4fa45fe11..acd58af193 100644
--- a/synapse/rest/client/v2_alpha/capabilities.py
+++ b/synapse/rest/client/v2_alpha/capabilities.py
@@ -58,7 +58,7 @@ class CapabilitiesRestServlet(RestServlet):
"m.change_password": {"enabled": change_password},
}
}
- return (200, response)
+ return 200, response
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py
index 9adf76cc0c..26d0235208 100644
--- a/synapse/rest/client/v2_alpha/devices.py
+++ b/synapse/rest/client/v2_alpha/devices.py
@@ -48,7 +48,7 @@ class DevicesRestServlet(RestServlet):
devices = yield self.device_handler.get_devices_by_user(
requester.user.to_string()
)
- return (200, {"devices": devices})
+ return 200, {"devices": devices}
class DeleteDevicesRestServlet(RestServlet):
@@ -91,7 +91,7 @@ class DeleteDevicesRestServlet(RestServlet):
yield self.device_handler.delete_devices(
requester.user.to_string(), body["devices"]
)
- return (200, {})
+ return 200, {}
class DeviceRestServlet(RestServlet):
@@ -114,7 +114,7 @@ class DeviceRestServlet(RestServlet):
device = yield self.device_handler.get_device(
requester.user.to_string(), device_id
)
- return (200, device)
+ return 200, device
@interactive_auth_handler
@defer.inlineCallbacks
@@ -137,7 +137,7 @@ class DeviceRestServlet(RestServlet):
)
yield self.device_handler.delete_device(requester.user.to_string(), device_id)
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_PUT(self, request, device_id):
@@ -147,7 +147,7 @@ class DeviceRestServlet(RestServlet):
yield self.device_handler.update_device(
requester.user.to_string(), device_id, body
)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py
index 22be0ee3c5..c6ddf24c8d 100644
--- a/synapse/rest/client/v2_alpha/filter.py
+++ b/synapse/rest/client/v2_alpha/filter.py
@@ -56,7 +56,7 @@ class GetFilterRestServlet(RestServlet):
user_localpart=target_user.localpart, filter_id=filter_id
)
- return (200, filter.get_filter_json())
+ return 200, filter.get_filter_json()
except (KeyError, StoreError):
raise SynapseError(400, "No such filter", errcode=Codes.NOT_FOUND)
@@ -89,7 +89,7 @@ class CreateFilterRestServlet(RestServlet):
user_localpart=target_user.localpart, user_filter=content
)
- return (200, {"filter_id": str(filter_id)})
+ return 200, {"filter_id": str(filter_id)}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
index e629c4256d..999a0fa80c 100644
--- a/synapse/rest/client/v2_alpha/groups.py
+++ b/synapse/rest/client/v2_alpha/groups.py
@@ -47,7 +47,7 @@ class GroupServlet(RestServlet):
group_id, requester_user_id
)
- return (200, group_description)
+ return 200, group_description
@defer.inlineCallbacks
def on_POST(self, request, group_id):
@@ -59,7 +59,7 @@ class GroupServlet(RestServlet):
group_id, requester_user_id, content
)
- return (200, {})
+ return 200, {}
class GroupSummaryServlet(RestServlet):
@@ -83,7 +83,7 @@ class GroupSummaryServlet(RestServlet):
group_id, requester_user_id
)
- return (200, get_group_summary)
+ return 200, get_group_summary
class GroupSummaryRoomsCatServlet(RestServlet):
@@ -120,7 +120,7 @@ class GroupSummaryRoomsCatServlet(RestServlet):
content=content,
)
- return (200, resp)
+ return 200, resp
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, category_id, room_id):
@@ -131,7 +131,7 @@ class GroupSummaryRoomsCatServlet(RestServlet):
group_id, requester_user_id, room_id=room_id, category_id=category_id
)
- return (200, resp)
+ return 200, resp
class GroupCategoryServlet(RestServlet):
@@ -157,7 +157,7 @@ class GroupCategoryServlet(RestServlet):
group_id, requester_user_id, category_id=category_id
)
- return (200, category)
+ return 200, category
@defer.inlineCallbacks
def on_PUT(self, request, group_id, category_id):
@@ -169,7 +169,7 @@ class GroupCategoryServlet(RestServlet):
group_id, requester_user_id, category_id=category_id, content=content
)
- return (200, resp)
+ return 200, resp
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, category_id):
@@ -180,7 +180,7 @@ class GroupCategoryServlet(RestServlet):
group_id, requester_user_id, category_id=category_id
)
- return (200, resp)
+ return 200, resp
class GroupCategoriesServlet(RestServlet):
@@ -204,7 +204,7 @@ class GroupCategoriesServlet(RestServlet):
group_id, requester_user_id
)
- return (200, category)
+ return 200, category
class GroupRoleServlet(RestServlet):
@@ -228,7 +228,7 @@ class GroupRoleServlet(RestServlet):
group_id, requester_user_id, role_id=role_id
)
- return (200, category)
+ return 200, category
@defer.inlineCallbacks
def on_PUT(self, request, group_id, role_id):
@@ -240,7 +240,7 @@ class GroupRoleServlet(RestServlet):
group_id, requester_user_id, role_id=role_id, content=content
)
- return (200, resp)
+ return 200, resp
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, role_id):
@@ -251,7 +251,7 @@ class GroupRoleServlet(RestServlet):
group_id, requester_user_id, role_id=role_id
)
- return (200, resp)
+ return 200, resp
class GroupRolesServlet(RestServlet):
@@ -275,7 +275,7 @@ class GroupRolesServlet(RestServlet):
group_id, requester_user_id
)
- return (200, category)
+ return 200, category
class GroupSummaryUsersRoleServlet(RestServlet):
@@ -312,7 +312,7 @@ class GroupSummaryUsersRoleServlet(RestServlet):
content=content,
)
- return (200, resp)
+ return 200, resp
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, role_id, user_id):
@@ -323,7 +323,7 @@ class GroupSummaryUsersRoleServlet(RestServlet):
group_id, requester_user_id, user_id=user_id, role_id=role_id
)
- return (200, resp)
+ return 200, resp
class GroupRoomServlet(RestServlet):
@@ -347,7 +347,7 @@ class GroupRoomServlet(RestServlet):
group_id, requester_user_id
)
- return (200, result)
+ return 200, result
class GroupUsersServlet(RestServlet):
@@ -371,7 +371,7 @@ class GroupUsersServlet(RestServlet):
group_id, requester_user_id
)
- return (200, result)
+ return 200, result
class GroupInvitedUsersServlet(RestServlet):
@@ -395,7 +395,7 @@ class GroupInvitedUsersServlet(RestServlet):
group_id, requester_user_id
)
- return (200, result)
+ return 200, result
class GroupSettingJoinPolicyServlet(RestServlet):
@@ -420,7 +420,7 @@ class GroupSettingJoinPolicyServlet(RestServlet):
group_id, requester_user_id, content
)
- return (200, result)
+ return 200, result
class GroupCreateServlet(RestServlet):
@@ -450,7 +450,7 @@ class GroupCreateServlet(RestServlet):
group_id, requester_user_id, content
)
- return (200, result)
+ return 200, result
class GroupAdminRoomsServlet(RestServlet):
@@ -477,7 +477,7 @@ class GroupAdminRoomsServlet(RestServlet):
group_id, requester_user_id, room_id, content
)
- return (200, result)
+ return 200, result
@defer.inlineCallbacks
def on_DELETE(self, request, group_id, room_id):
@@ -488,7 +488,7 @@ class GroupAdminRoomsServlet(RestServlet):
group_id, requester_user_id, room_id
)
- return (200, result)
+ return 200, result
class GroupAdminRoomsConfigServlet(RestServlet):
@@ -516,7 +516,7 @@ class GroupAdminRoomsConfigServlet(RestServlet):
group_id, requester_user_id, room_id, config_key, content
)
- return (200, result)
+ return 200, result
class GroupAdminUsersInviteServlet(RestServlet):
@@ -546,7 +546,7 @@ class GroupAdminUsersInviteServlet(RestServlet):
group_id, user_id, requester_user_id, config
)
- return (200, result)
+ return 200, result
class GroupAdminUsersKickServlet(RestServlet):
@@ -573,7 +573,7 @@ class GroupAdminUsersKickServlet(RestServlet):
group_id, user_id, requester_user_id, content
)
- return (200, result)
+ return 200, result
class GroupSelfLeaveServlet(RestServlet):
@@ -598,7 +598,7 @@ class GroupSelfLeaveServlet(RestServlet):
group_id, requester_user_id, requester_user_id, content
)
- return (200, result)
+ return 200, result
class GroupSelfJoinServlet(RestServlet):
@@ -623,7 +623,7 @@ class GroupSelfJoinServlet(RestServlet):
group_id, requester_user_id, content
)
- return (200, result)
+ return 200, result
class GroupSelfAcceptInviteServlet(RestServlet):
@@ -648,7 +648,7 @@ class GroupSelfAcceptInviteServlet(RestServlet):
group_id, requester_user_id, content
)
- return (200, result)
+ return 200, result
class GroupSelfUpdatePublicityServlet(RestServlet):
@@ -672,7 +672,7 @@ class GroupSelfUpdatePublicityServlet(RestServlet):
publicise = content["publicise"]
yield self.store.update_group_publicity(group_id, requester_user_id, publicise)
- return (200, {})
+ return 200, {}
class PublicisedGroupsForUserServlet(RestServlet):
@@ -694,7 +694,7 @@ class PublicisedGroupsForUserServlet(RestServlet):
result = yield self.groups_handler.get_publicised_groups_for_user(user_id)
- return (200, result)
+ return 200, result
class PublicisedGroupsForUsersServlet(RestServlet):
@@ -719,7 +719,7 @@ class PublicisedGroupsForUsersServlet(RestServlet):
result = yield self.groups_handler.bulk_get_publicised_groups(user_ids)
- return (200, result)
+ return 200, result
class GroupsForUserServlet(RestServlet):
@@ -741,7 +741,7 @@ class GroupsForUserServlet(RestServlet):
result = yield self.groups_handler.get_joined_groups(requester_user_id)
- return (200, result)
+ return 200, result
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index b218a3f334..2e680134a0 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -24,7 +24,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
-from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
+from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import StreamToken
from ._base import client_patterns
@@ -69,7 +69,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
- @trace_using_operation_name("upload_keys")
+ @trace(opname="upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -105,7 +105,7 @@ class KeyUploadServlet(RestServlet):
result = yield self.e2e_keys_handler.upload_keys_for_user(
user_id, device_id, body
)
- return (200, result)
+ return 200, result
class KeyQueryServlet(RestServlet):
@@ -159,7 +159,7 @@ class KeyQueryServlet(RestServlet):
timeout = parse_integer(request, "timeout", 10 * 1000)
body = parse_json_object_from_request(request)
result = yield self.e2e_keys_handler.query_devices(body, timeout)
- return (200, result)
+ return 200, result
class KeyChangesServlet(RestServlet):
@@ -200,7 +200,7 @@ class KeyChangesServlet(RestServlet):
results = yield self.device_handler.get_user_ids_changed(user_id, from_token)
- return (200, results)
+ return 200, results
class OneTimeKeyServlet(RestServlet):
@@ -235,7 +235,7 @@ class OneTimeKeyServlet(RestServlet):
timeout = parse_integer(request, "timeout", 10 * 1000)
body = parse_json_object_from_request(request)
result = yield self.e2e_keys_handler.claim_one_time_keys(body, timeout)
- return (200, result)
+ return 200, result
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py
index d034863a3c..10c1ad5b07 100644
--- a/synapse/rest/client/v2_alpha/notifications.py
+++ b/synapse/rest/client/v2_alpha/notifications.py
@@ -88,7 +88,7 @@ class NotificationsServlet(RestServlet):
returned_push_actions.append(returned_pa)
next_token = str(pa["stream_ordering"])
- return (200, {"notifications": returned_push_actions, "next_token": next_token})
+ return 200, {"notifications": returned_push_actions, "next_token": next_token}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py
index d93d6a9f24..b3bf8567e1 100644
--- a/synapse/rest/client/v2_alpha/read_marker.py
+++ b/synapse/rest/client/v2_alpha/read_marker.py
@@ -59,7 +59,7 @@ class ReadMarkerRestServlet(RestServlet):
event_id=read_marker_event_id,
)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py
index 98a97b7059..0dab03d227 100644
--- a/synapse/rest/client/v2_alpha/receipts.py
+++ b/synapse/rest/client/v2_alpha/receipts.py
@@ -52,7 +52,7 @@ class ReceiptRestServlet(RestServlet):
room_id, receipt_type, user_id=requester.user.to_string(), event_id=event_id
)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 9510a1e2b0..1ccd2bed2f 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -94,7 +94,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
ret = yield self.identity_handler.requestEmailToken(**body)
- return (200, ret)
+ return 200, ret
class MsisdnRegisterRequestTokenRestServlet(RestServlet):
@@ -137,7 +137,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
)
ret = yield self.identity_handler.requestMsisdnToken(**body)
- return (200, ret)
+ return 200, ret
class UsernameAvailabilityRestServlet(RestServlet):
@@ -177,7 +177,7 @@ class UsernameAvailabilityRestServlet(RestServlet):
yield self.registration_handler.check_username(username)
- return (200, {"available": True})
+ return 200, {"available": True}
class RegisterRestServlet(RestServlet):
@@ -230,7 +230,6 @@ class RegisterRestServlet(RestServlet):
if kind == b"guest":
ret = yield self._do_guest_registration(body, address=client_addr)
return ret
- return
elif kind != b"user":
raise UnrecognizedRequestError(
"Do not understand membership kind: %s" % (kind,)
@@ -279,8 +278,7 @@ class RegisterRestServlet(RestServlet):
result = yield self._do_appservice_registration(
desired_username, access_token, body
)
- return (200, result) # we throw for non 200 responses
- return
+ return 200, result # we throw for non 200 responses
# for regular registration, downcase the provided username before
# attempting to register it. This should mean
@@ -483,11 +481,9 @@ class RegisterRestServlet(RestServlet):
user_id=registered_user_id,
auth_result=auth_result,
access_token=return_dict.get("access_token"),
- bind_email=params.get("bind_email"),
- bind_msisdn=params.get("bind_msisdn"),
)
- return (200, return_dict)
+ return 200, return_dict
def on_OPTIONS(self, _):
return 200, {}
diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py
index 1538b247e5..040b37c504 100644
--- a/synapse/rest/client/v2_alpha/relations.py
+++ b/synapse/rest/client/v2_alpha/relations.py
@@ -118,7 +118,7 @@ class RelationSendServlet(RestServlet):
requester, event_dict=event_dict, txn_id=txn_id
)
- return (200, {"event_id": event.event_id})
+ return 200, {"event_id": event.event_id}
class RelationPaginationServlet(RestServlet):
@@ -198,7 +198,7 @@ class RelationPaginationServlet(RestServlet):
return_value["chunk"] = events
return_value["original_event"] = original_event
- return (200, return_value)
+ return 200, return_value
class RelationAggregationPaginationServlet(RestServlet):
@@ -270,7 +270,7 @@ class RelationAggregationPaginationServlet(RestServlet):
to_token=to_token,
)
- return (200, pagination_chunk.to_dict())
+ return 200, pagination_chunk.to_dict()
class RelationAggregationGroupPaginationServlet(RestServlet):
@@ -356,7 +356,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet):
return_value = result.to_dict()
return_value["chunk"] = events
- return (200, return_value)
+ return 200, return_value
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py
index 3fdd4584a3..e7449864cd 100644
--- a/synapse/rest/client/v2_alpha/report_event.py
+++ b/synapse/rest/client/v2_alpha/report_event.py
@@ -72,7 +72,7 @@ class ReportEventRestServlet(RestServlet):
received_ts=self.clock.time_msec(),
)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
index 10dec96208..df4f44cd36 100644
--- a/synapse/rest/client/v2_alpha/room_keys.py
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -135,7 +135,7 @@ class RoomKeysServlet(RestServlet):
body = {"rooms": {room_id: body}}
yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_GET(self, request, room_id, session_id):
@@ -218,7 +218,7 @@ class RoomKeysServlet(RestServlet):
else:
room_keys = room_keys["rooms"][room_id]
- return (200, room_keys)
+ return 200, room_keys
@defer.inlineCallbacks
def on_DELETE(self, request, room_id, session_id):
@@ -242,7 +242,7 @@ class RoomKeysServlet(RestServlet):
yield self.e2e_room_keys_handler.delete_room_keys(
user_id, version, room_id, session_id
)
- return (200, {})
+ return 200, {}
class RoomKeysNewVersionServlet(RestServlet):
@@ -293,7 +293,7 @@ class RoomKeysNewVersionServlet(RestServlet):
info = parse_json_object_from_request(request)
new_version = yield self.e2e_room_keys_handler.create_version(user_id, info)
- return (200, {"version": new_version})
+ return 200, {"version": new_version}
# we deliberately don't have a PUT /version, as these things really should
# be immutable to avoid people footgunning
@@ -338,7 +338,7 @@ class RoomKeysVersionServlet(RestServlet):
except SynapseError as e:
if e.code == 404:
raise SynapseError(404, "No backup found", Codes.NOT_FOUND)
- return (200, info)
+ return 200, info
@defer.inlineCallbacks
def on_DELETE(self, request, version):
@@ -358,7 +358,7 @@ class RoomKeysVersionServlet(RestServlet):
user_id = requester.user.to_string()
yield self.e2e_room_keys_handler.delete_version(user_id, version)
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_PUT(self, request, version):
@@ -392,7 +392,7 @@ class RoomKeysVersionServlet(RestServlet):
)
yield self.e2e_room_keys_handler.update_version(user_id, version, info)
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
index 14ba61a63e..d2c3316eb7 100644
--- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
+++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
@@ -80,7 +80,7 @@ class RoomUpgradeRestServlet(RestServlet):
ret = {"replacement_room": new_room_id}
- return (200, ret)
+ return 200, ret
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py
index 2613648d82..d90e52ed1a 100644
--- a/synapse/rest/client/v2_alpha/sendtodevice.py
+++ b/synapse/rest/client/v2_alpha/sendtodevice.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from synapse.http import servlet
from synapse.http.servlet import parse_json_object_from_request
+from synapse.logging.opentracing import set_tag, trace
from synapse.rest.client.transactions import HttpTransactionCache
from ._base import client_patterns
@@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
self.txns = HttpTransactionCache(hs)
self.device_message_handler = hs.get_device_message_handler()
+ @trace(opname="sendToDevice")
def on_PUT(self, request, message_type, txn_id):
+ set_tag("message_type", message_type)
+ set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 7b32dd2212..c98c5a3802 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -174,7 +174,7 @@ class SyncRestServlet(RestServlet):
time_now, sync_result, requester.access_token_id, filter
)
- return (200, response_content)
+ return 200, response_content
@defer.inlineCallbacks
def encode_response(self, time_now, sync_result, access_token_id, filter):
diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py
index d173544355..3b555669a0 100644
--- a/synapse/rest/client/v2_alpha/tags.py
+++ b/synapse/rest/client/v2_alpha/tags.py
@@ -45,7 +45,7 @@ class TagListServlet(RestServlet):
tags = yield self.store.get_tags_for_room(user_id, room_id)
- return (200, {"tags": tags})
+ return 200, {"tags": tags}
class TagServlet(RestServlet):
@@ -76,7 +76,7 @@ class TagServlet(RestServlet):
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
- return (200, {})
+ return 200, {}
@defer.inlineCallbacks
def on_DELETE(self, request, user_id, room_id, tag):
@@ -88,7 +88,7 @@ class TagServlet(RestServlet):
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
- return (200, {})
+ return 200, {}
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py
index 158e686b01..2e8d672471 100644
--- a/synapse/rest/client/v2_alpha/thirdparty.py
+++ b/synapse/rest/client/v2_alpha/thirdparty.py
@@ -40,7 +40,7 @@ class ThirdPartyProtocolsServlet(RestServlet):
yield self.auth.get_user_by_req(request, allow_guest=True)
protocols = yield self.appservice_handler.get_3pe_protocols()
- return (200, protocols)
+ return 200, protocols
class ThirdPartyProtocolServlet(RestServlet):
@@ -60,9 +60,9 @@ class ThirdPartyProtocolServlet(RestServlet):
only_protocol=protocol
)
if protocol in protocols:
- return (200, protocols[protocol])
+ return 200, protocols[protocol]
else:
- return (404, {"error": "Unknown protocol"})
+ return 404, {"error": "Unknown protocol"}
class ThirdPartyUserServlet(RestServlet):
@@ -85,7 +85,7 @@ class ThirdPartyUserServlet(RestServlet):
ThirdPartyEntityKind.USER, protocol, fields
)
- return (200, results)
+ return 200, results
class ThirdPartyLocationServlet(RestServlet):
@@ -108,7 +108,7 @@ class ThirdPartyLocationServlet(RestServlet):
ThirdPartyEntityKind.LOCATION, protocol, fields
)
- return (200, results)
+ return 200, results
def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py
index 7ab2b80e46..2863affbab 100644
--- a/synapse/rest/client/v2_alpha/user_directory.py
+++ b/synapse/rest/client/v2_alpha/user_directory.py
@@ -60,7 +60,7 @@ class UserDirectorySearchRestServlet(RestServlet):
user_id = requester.user.to_string()
if not self.hs.config.user_directory_search_enabled:
- return (200, {"limited": False, "results": []})
+ return 200, {"limited": False, "results": []}
body = parse_json_object_from_request(request)
@@ -76,7 +76,7 @@ class UserDirectorySearchRestServlet(RestServlet):
user_id, search_term, limit
)
- return (200, results)
+ return 200, results
def register_servlets(hs, http_server):
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index cf5759e9a6..b972e152a9 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -318,14 +318,14 @@ class MediaRepository(object):
responder = yield self.media_storage.fetch_media(file_info)
if responder:
- return (responder, media_info)
+ return responder, media_info
# Failed to find the file anywhere, lets download it.
media_info = yield self._download_remote_file(server_name, media_id, file_id)
responder = yield self.media_storage.fetch_media(file_info)
- return (responder, media_info)
+ return responder, media_info
@defer.inlineCallbacks
def _download_remote_file(self, server_name, media_id, file_id):
@@ -526,7 +526,7 @@ class MediaRepository(object):
try:
file_info = FileInfo(
server_name=server_name,
- file_id=media_id,
+ file_id=file_id,
thumbnail=True,
thumbnail_width=t_width,
thumbnail_height=t_height,
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index bd40891a7f..7a56cd4b6c 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -183,7 +183,6 @@ class PreviewUrlResource(DirectServeResource):
if isinstance(og, six.text_type):
og = og.encode("utf8")
return og
- return
media_info = yield self._download_url(url, user)
diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py
index 90d8e6bffe..c995d7e043 100644
--- a/synapse/rest/media/v1/thumbnailer.py
+++ b/synapse/rest/media/v1/thumbnailer.py
@@ -78,9 +78,9 @@ class Thumbnailer(object):
"""
if max_width * self.height < max_height * self.width:
- return (max_width, (max_width * self.height) // self.width)
+ return max_width, (max_width * self.height) // self.width
else:
- return ((max_height * self.width) // self.height, max_height)
+ return (max_height * self.width) // self.height, max_height
def scale(self, width, height, output_type):
"""Rescales the image to the given dimensions.
diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py
index 729c097e6d..81c4aff496 100644
--- a/synapse/server_notices/resource_limits_server_notices.py
+++ b/synapse/server_notices/resource_limits_server_notices.py
@@ -193,4 +193,4 @@ class ResourceLimitsServerNotices(object):
if event_id in referenced_events:
referenced_events.remove(event.event_id)
- return (currently_blocked, referenced_events)
+ return currently_blocked, referenced_events
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index a0d34f16ea..2b0f4c79ee 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -136,7 +136,6 @@ class StateHandler(object):
if event_id:
event = yield self.store.get_event(event_id, allow_none=True)
return event
- return
state_map = yield self.store.get_events(
list(state.values()), get_prev_content=False
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 9fa5b4f3d6..6afbfc0d74 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -90,7 +90,7 @@ class AccountDataWorkerStore(SQLBaseStore):
room_data = by_room.setdefault(row["room_id"], {})
room_data[row["account_data_type"]] = json.loads(row["content"])
- return (global_account_data, by_room)
+ return global_account_data, by_room
return self.runInteraction(
"get_account_data_for_user", get_account_data_for_user_txn
@@ -205,7 +205,7 @@ class AccountDataWorkerStore(SQLBaseStore):
)
txn.execute(sql, (last_room_id, current_id, limit))
room_results = txn.fetchall()
- return (global_results, room_results)
+ return global_results, room_results
return self.runInteraction(
"get_all_updated_account_data_txn", get_updated_account_data_txn
@@ -244,13 +244,13 @@ class AccountDataWorkerStore(SQLBaseStore):
room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = json.loads(row[2])
- return (global_account_data, account_data_by_room)
+ return global_account_data, account_data_by_room
changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(stream_id)
)
if not changed:
- return ({}, {})
+ return {}, {}
return self.runInteraction(
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 05d9c05c3f..435b2acd4d 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -165,7 +165,6 @@ class ApplicationServiceTransactionWorkerStore(
)
if result:
return result.get("state")
- return
return None
def set_appservice_state(self, service, state):
@@ -358,7 +357,7 @@ class ApplicationServiceTransactionWorkerStore(
events = yield self.get_events_as_list(event_ids)
- return (upper_bound, events)
+ return upper_bound, events
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 79bb0ea46d..6b7458304e 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -19,6 +19,7 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
@@ -66,12 +67,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
- return (messages, stream_pos)
+ return messages, stream_pos
return self.runInteraction(
"get_new_messages_for_device", get_new_messages_for_device_txn
)
+ @trace
@defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
"""
@@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), None
)
+
+ set_tag("last_deleted_stream_id", last_deleted_stream_id)
+
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_deleted_stream_id
)
if not has_changed:
+ log_kv({"message": "No changes in cache since last check"})
return 0
def delete_messages_for_device_txn(txn):
@@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_messages_for_device", delete_messages_for_device_txn
)
+ log_kv(
+ {"message": "deleted {} messages for device".format(count), "count": count}
+ )
+
# Update the cache, ensuring that we only ever increase the value
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count
+ @trace
def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit
):
@@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
in the stream the messages got to.
"""
+ set_tag("destination", destination)
+ set_tag("last_stream_id", last_stream_id)
+ set_tag("current_stream_id", current_stream_id)
+ set_tag("limit", limit)
+
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
)
if not has_changed or last_stream_id == current_stream_id:
+ log_kv({"message": "No new messages in stream"})
return defer.succeed(([], current_stream_id))
if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction.
return defer.succeed(([], last_stream_id))
+ @trace
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
@@ -156,14 +174,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = row[0]
messages.append(json.loads(row[1]))
if len(messages) < limit:
+ log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id
- return (messages, stream_pos)
+ return messages, stream_pos
return self.runInteraction(
"get_new_device_msgs_for_remote",
get_new_messages_for_remote_destination_txn,
)
+ @trace
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
"""Used to delete messages when the remote destination acknowledges
their receipt.
@@ -214,6 +234,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
expiry_ms=30 * 60 * 1000,
)
+ @trace
@defer.inlineCallbacks
def add_messages_to_device_inbox(
self, local_messages_by_user_then_device, remote_messages_by_destination
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index e11881161d..79a58df591 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.logging.opentracing import (
get_active_span_text_map,
+ set_tag,
trace,
whitelisted_homeserver,
)
@@ -94,7 +95,7 @@ class DeviceWorkerStore(SQLBaseStore):
destination, int(from_stream_id)
)
if not has_changed:
- return (now_stream_id, [])
+ return now_stream_id, []
# We retrieve n+1 devices from the list of outbound pokes where n is
# our outbound device update limit. We then check if the very last
@@ -117,7 +118,7 @@ class DeviceWorkerStore(SQLBaseStore):
# Return an empty list if there are no updates
if not updates:
- return (now_stream_id, [])
+ return now_stream_id, []
# if we have exceeded the limit, we need to exclude any results with the
# same stream_id as the last row.
@@ -167,13 +168,13 @@ class DeviceWorkerStore(SQLBaseStore):
# skip that stream_id and return an empty list, and continue with the next
# stream_id next time.
if not query_map:
- return (stream_id_cutoff, [])
+ return stream_id_cutoff, []
results = yield self._get_device_update_edus_by_remote(
destination, from_stream_id, query_map
)
- return (now_stream_id, results)
+ return now_stream_id, results
def _get_devices_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id, limit
@@ -321,6 +322,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()
+ @trace
@defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache.
@@ -352,7 +354,10 @@ class DeviceWorkerStore(SQLBaseStore):
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
- return (user_ids_not_in_cache, results)
+ set_tag("in_cache", results)
+ set_tag("not_in_cache", user_ids_not_in_cache)
+
+ return user_ids_not_in_cache, results
@cachedInlineCallbacks(num_args=2, tree=True)
def _get_cached_user_device(self, user_id, device_id):
@@ -851,7 +856,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"ts": now,
"opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
- else None,
+ else "{}",
}
for destination in hosts
for device_id in device_ids
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index e966a73f3d..eed7757ed5 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -47,7 +47,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not room_id:
return None
- return
servers = yield self._simple_select_onecol(
"room_alias_servers",
@@ -58,7 +57,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not servers:
return None
- return
return RoomAliasMapping(room_id, room_alias.to_string(), servers)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d0d1781c90..a5d13ddc49 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -818,7 +818,7 @@ class EventsStore(
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
- return (None, None)
+ return None, None
if len(new_state_groups) == 1 and len(old_state_groups) == 1:
# If we're going from one state group to another, lets check if
@@ -835,7 +835,7 @@ class EventsStore(
# the current state in memory then lets also return that,
# but it doesn't matter if we don't.
new_state = state_groups_map.get(new_state_group)
- return (new_state, delta_ids)
+ return new_state, delta_ids
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@@ -847,7 +847,7 @@ class EventsStore(
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- return (state_groups_map[new_state_groups.pop()], None)
+ return state_groups_map[new_state_groups.pop()], None
# Ok, we need to defer to the state handler to resolve our state sets.
@@ -876,7 +876,7 @@ class EventsStore(
state_res_store=StateResolutionStore(self),
)
- return (res.state, None)
+ return res.state, None
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -899,7 +899,7 @@ class EventsStore(
if ev_id != existing_state.get(key)
}
- return (to_delete, to_insert)
+ return to_delete, to_insert
@log_function
def _persist_events_txn(
@@ -2358,8 +2358,9 @@ class EventsStore(
"room_aliases",
"room_depth",
"room_memberships",
- "room_state",
- "room_stats",
+ "room_stats_state",
+ "room_stats_current",
+ "room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 1a0f2d5768..5db6f2d84a 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -90,7 +90,7 @@ class PresenceStore(SQLBaseStore):
presence_states,
)
- return (stream_orderings[-1], self._presence_id_gen.get_current_token())
+ return stream_orderings[-1], self._presence_id_gen.get_current_token()
def _update_presence_txn(self, txn, stream_orderings, presence_states):
for stream_id, state in zip(stream_orderings, presence_states):
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 8a5d8e9b18..912c1df6be 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -35,7 +35,6 @@ class ProfileWorkerStore(SQLBaseStore):
if e.code == 404:
# no match
return ProfileInfo(None, None)
- return
else:
raise
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index b431d24b8a..3e0e834a62 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -133,7 +133,7 @@ class PusherWorkerStore(SQLBaseStore):
txn.execute(sql, (last_id, current_id, limit))
deleted = txn.fetchall()
- return (updated, deleted)
+ return updated, deleted
return self.runInteraction(
"get_all_updated_pushers", get_all_updated_pushers_txn
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6aa6d98ebb..290ddb30e8 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -478,7 +478,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
max_persisted_id = self._receipts_id_gen.get_current_token()
- return (stream_id, max_persisted_id)
+ return stream_id, max_persisted_id
def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, data):
return self.runInteraction(
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3f50324253..2d3c7e2dc9 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -869,6 +869,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
+ if self.hs.config.stats_enabled:
+ # we create a new completed user statistics row
+
+ # we don't strictly need current_token since this user really can't
+ # have any state deltas before now (as it is a new user), but still,
+ # we include it for completeness.
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+ self._update_stats_delta_txn(
+ txn, now, "user", user_id, {}, complete_with_stream_id=current_token
+ )
+
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@@ -1140,6 +1151,7 @@ class RegistrationStore(
deferred str|None: A str representing a link to redirect the user
to if there is one.
"""
+
# Insert everything into a transaction in order to run atomically
def validate_threepid_session_txn(txn):
row = self._simple_select_one_txn(
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index eecb276465..f8b682ebd9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -112,29 +112,31 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
- def f(txn):
- # If we can assume current_state_events.membership is up to date
- # then we can avoid a join, which is a Very Good Thing given how
- # frequently this function gets called.
- if self._current_state_events_membership_up_to_date:
- sql = """
- SELECT state_key FROM current_state_events
- WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
- """
- else:
- sql = """
- SELECT state_key FROM room_memberships as m
- INNER JOIN current_state_events as c
- ON m.event_id = c.event_id
- AND m.room_id = c.room_id
- AND m.user_id = c.state_key
- WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
- """
+ return self.runInteraction(
+ "get_users_in_room", self.get_users_in_room_txn, room_id
+ )
- txn.execute(sql, (room_id, Membership.JOIN))
- return [to_ascii(r[0]) for r in txn]
+ def get_users_in_room_txn(self, txn, room_id):
+ # If we can assume current_state_events.membership is up to date
+ # then we can avoid a join, which is a Very Good Thing given how
+ # frequently this function gets called.
+ if self._current_state_events_membership_up_to_date:
+ sql = """
+ SELECT state_key FROM current_state_events
+ WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+ """
+ else:
+ sql = """
+ SELECT state_key FROM room_memberships as m
+ INNER JOIN current_state_events as c
+ ON m.event_id = c.event_id
+ AND m.room_id = c.room_id
+ AND m.user_id = c.state_key
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+ """
- return self.runInteraction("get_users_in_room", f)
+ txn.execute(sql, (room_id, Membership.JOIN))
+ return [to_ascii(r[0]) for r in txn]
@cached(max_entries=100000)
def get_room_summary(self, room_id):
diff --git a/synapse/storage/schema/delta/56/stats_separated.sql b/synapse/storage/schema/delta/56/stats_separated.sql
new file mode 100644
index 0000000000..163529c071
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated.sql
@@ -0,0 +1,152 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS room_state;
+DROP TABLE IF EXISTS room_stats_state;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+ 'populate_stats_createtables',
+ 'populate_stats_process_rooms',
+ 'populate_stats_process_users',
+ 'populate_stats_cleanup'
+);
+
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_rooms', '{}', '');
+
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+DROP TABLE IF EXISTS stats_incremental_position;
+CREATE TABLE stats_incremental_position (
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+-- insert a null row and make sure it is the only one.
+INSERT INTO stats_incremental_position (
+ stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;
+
+-- represents PRESENT room statistics for a room
+-- only holds absolute fields
+DROP TABLE IF EXISTS room_stats_current;
+CREATE TABLE room_stats_current (
+ room_id TEXT NOT NULL PRIMARY KEY,
+
+ -- These are absolute counts
+ current_state_events INT NOT NULL,
+ joined_members INT NOT NULL,
+ invited_members INT NOT NULL,
+ left_members INT NOT NULL,
+ banned_members INT NOT NULL,
+
+ local_users_in_room INT NOT NULL,
+
+ -- The maximum delta stream position that this row takes into account.
+ completed_delta_stream_id BIGINT NOT NULL
+);
+
+
+-- represents HISTORICAL room statistics for a room
+DROP TABLE IF EXISTS room_stats_historical;
+CREATE TABLE room_stats_historical (
+ room_id TEXT NOT NULL,
+ -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
+ -- Note that end_ts is quantised.
+ end_ts BIGINT NOT NULL,
+ bucket_size BIGINT NOT NULL,
+
+ -- These stats are absolute counts
+ current_state_events BIGINT NOT NULL,
+ joined_members BIGINT NOT NULL,
+ invited_members BIGINT NOT NULL,
+ left_members BIGINT NOT NULL,
+ banned_members BIGINT NOT NULL,
+ local_users_in_room BIGINT NOT NULL,
+
+ -- These stats are per time slice
+ total_events BIGINT NOT NULL,
+ total_event_bytes BIGINT NOT NULL,
+
+ PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- represents PRESENT statistics for a user
+-- only holds absolute fields
+DROP TABLE IF EXISTS user_stats_current;
+CREATE TABLE user_stats_current (
+ user_id TEXT NOT NULL PRIMARY KEY,
+
+ joined_rooms BIGINT NOT NULL,
+
+ -- The maximum delta stream position that this row takes into account.
+ completed_delta_stream_id BIGINT NOT NULL
+);
+
+-- represents HISTORICAL statistics for a user
+DROP TABLE IF EXISTS user_stats_historical;
+CREATE TABLE user_stats_historical (
+ user_id TEXT NOT NULL,
+ end_ts BIGINT NOT NULL,
+ bucket_size BIGINT NOT NULL,
+
+ joined_rooms BIGINT NOT NULL,
+
+ invites_sent BIGINT NOT NULL,
+ rooms_created BIGINT NOT NULL,
+ total_events BIGINT NOT NULL,
+ total_event_bytes BIGINT NOT NULL,
+
+ PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+
+CREATE TABLE room_stats_state (
+ room_id TEXT NOT NULL,
+ name TEXT,
+ canonical_alias TEXT,
+ join_rules TEXT,
+ history_visibility TEXT,
+ encryption TEXT,
+ avatar TEXT,
+ guest_access TEXT,
+ is_federatable BOOLEAN,
+ topic TEXT
+);
+
+CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id);
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..6560173c08 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,17 +15,22 @@
# limitations under the License.
import logging
+from itertools import chain
from twisted.internet import defer
+from twisted.internet.defer import DeferredLock
from synapse.api.constants import EventTypes, Membership
-from synapse.storage.prepare_database import get_statements
+from synapse.storage import PostgresEngine
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
@@ -32,14 +38,23 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
- "state_events",
+ "local_users_in_room",
),
- "user": ("public_rooms", "private_rooms"),
+ "user": ("joined_rooms",),
}
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
+PER_SLICE_FIELDS = {
+ "room": ("total_events", "total_event_bytes"),
+ "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
+}
+
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
-TEMP_TABLE = "_temp_populate_stats"
+# these are the tables (& ID columns) which contain our actual subjects
+TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
class StatsStore(StateDeltasStore):
@@ -51,136 +66,102 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
- self.register_background_update_handler(
- "populate_stats_createtables", self._populate_stats_createtables
- )
+ self.stats_delta_processing_lock = DeferredLock()
+
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
- "populate_stats_cleanup", self._populate_stats_cleanup
+ "populate_stats_process_users", self._populate_stats_process_users
)
+ # we no longer need to perform clean-up, but we will give ourselves
+ # the potential to reintroduce it in the future ā so documentation
+ # will still encourage the use of this no-op handler.
+ self.register_noop_background_update("populate_stats_cleanup")
+ self.register_noop_background_update("populate_stats_prepare")
- @defer.inlineCallbacks
- def _populate_stats_createtables(self, progress, batch_size):
-
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_createtables")
- return 1
-
- # Get all the rooms that we want to process.
- def _make_staging_area(txn):
- # Create the temporary tables
- stmts = get_statements(
- """
- -- We just recreate the table, we'll be reinserting the
- -- correct entries again later anyway.
- DROP TABLE IF EXISTS {temp}_rooms;
-
- CREATE TABLE IF NOT EXISTS {temp}_rooms(
- room_id TEXT NOT NULL,
- events BIGINT NOT NULL
- );
-
- CREATE INDEX {temp}_rooms_events
- ON {temp}_rooms(events);
- CREATE INDEX {temp}_rooms_id
- ON {temp}_rooms(room_id);
- """.format(
- temp=TEMP_TABLE
- ).splitlines()
- )
-
- for statement in stmts:
- txn.execute(statement)
-
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_position(position TEXT NOT NULL)"
- )
- txn.execute(sql)
-
- # Get rooms we want to process from the database, only adding
- # those that we haven't (i.e. those not in room_stats_earliest_token)
- sql = """
- INSERT INTO %s_rooms (room_id, events)
- SELECT c.room_id, count(*) FROM current_state_events AS c
- LEFT JOIN room_stats_earliest_token AS t USING (room_id)
- WHERE t.room_id IS NULL
- GROUP BY c.room_id
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
+ def quantise_stats_time(self, ts):
+ """
+ Quantises a timestamp to be a multiple of the bucket size.
- new_pos = yield self.get_max_stream_id_in_current_state_deltas()
- yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
- yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
- self.get_earliest_token_for_room_stats.invalidate_all()
+ Args:
+ ts (int): the timestamp to quantise, in milliseconds since the Unix
+ Epoch
- yield self._end_background_update("populate_stats_createtables")
- return 1
+ Returns:
+ int: a timestamp which
+ - is divisible by the bucket size;
+ - is no later than `ts`; and
+ - is the largest such timestamp.
+ """
+ return (ts // self.stats_bucket_size) * self.stats_bucket_size
@defer.inlineCallbacks
- def _populate_stats_cleanup(self, progress, batch_size):
+ def _populate_stats_process_users(self, progress, batch_size):
"""
- Update the user directory stream position, then clean up the old tables.
+ This is a background update which regenerates statistics for users.
"""
if not self.stats_enabled:
- yield self._end_background_update("populate_stats_cleanup")
+ yield self._end_background_update("populate_stats_process_users")
return 1
- position = yield self._simple_select_one_onecol(
- TEMP_TABLE + "_position", None, "position"
+ last_user_id = progress.get("last_user_id", "")
+
+ def _get_next_batch(txn):
+ sql = """
+ SELECT DISTINCT name FROM users
+ WHERE name > ?
+ ORDER BY name ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_user_id, batch_size))
+ return [r for r, in txn]
+
+ users_to_work_on = yield self.runInteraction(
+ "_populate_stats_process_users", _get_next_batch
)
- yield self.update_stats_stream_pos(position)
- def _delete_staging_area(txn):
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+ # No more rooms -- complete the transaction.
+ if not users_to_work_on:
+ yield self._end_background_update("populate_stats_process_users")
+ return 1
- yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+ for user_id in users_to_work_on:
+ yield self._calculate_and_set_initial_state_for_user(user_id)
+ progress["last_user_id"] = user_id
- yield self._end_background_update("populate_stats_cleanup")
- return 1
+ yield self.runInteraction(
+ "populate_stats_process_users",
+ self._background_update_progress_txn,
+ "populate_stats_process_users",
+ progress,
+ )
+
+ return len(users_to_work_on)
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
-
+ """
+ This is a background update which regenerates statistics for rooms.
+ """
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
return 1
- # If we don't have progress filed, delete everything.
- if not progress:
- yield self.delete_all_stats()
+ last_room_id = progress.get("last_room_id", "")
def _get_next_batch(txn):
- # Only fetch 250 rooms, so we don't fetch too many at once, even
- # if those 250 rooms have less than batch_size state events.
sql = """
- SELECT room_id, events FROM %s_rooms
- ORDER BY events DESC
- LIMIT 250
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
- rooms_to_work_on = txn.fetchall()
-
- if not rooms_to_work_on:
- return None
-
- # Get how many are left to process, so we can give status on how
- # far we are in processing
- txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
- progress["remaining"] = txn.fetchone()[0]
-
- return rooms_to_work_on
+ SELECT DISTINCT room_id FROM current_state_events
+ WHERE room_id > ?
+ ORDER BY room_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_room_id, batch_size))
+ return [r for r, in txn]
rooms_to_work_on = yield self.runInteraction(
- "populate_stats_temp_read", _get_next_batch
+ "populate_stats_rooms_get_batch", _get_next_batch
)
# No more rooms -- complete the transaction.
@@ -188,154 +169,28 @@ class StatsStore(StateDeltasStore):
yield self._end_background_update("populate_stats_process_rooms")
return 1
- logger.info(
- "Processing the next %d rooms of %d remaining",
- len(rooms_to_work_on),
- progress["remaining"],
- )
-
- # Number of state events we've processed by going through each room
- processed_event_count = 0
-
- for room_id, event_count in rooms_to_work_on:
-
- current_state_ids = yield self.get_current_state_ids(room_id)
-
- join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
- history_visibility_id = current_state_ids.get(
- (EventTypes.RoomHistoryVisibility, "")
- )
- encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
- name_id = current_state_ids.get((EventTypes.Name, ""))
- topic_id = current_state_ids.get((EventTypes.Topic, ""))
- avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
- canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
-
- event_ids = [
- join_rules_id,
- history_visibility_id,
- encryption_id,
- name_id,
- topic_id,
- avatar_id,
- canonical_alias_id,
- ]
-
- state_events = yield self.get_events(
- [ev for ev in event_ids if ev is not None]
- )
-
- def _get_or_none(event_id, arg):
- event = state_events.get(event_id)
- if event:
- return event.content.get(arg)
- return None
-
- yield self.update_room_state(
- room_id,
- {
- "join_rules": _get_or_none(join_rules_id, "join_rule"),
- "history_visibility": _get_or_none(
- history_visibility_id, "history_visibility"
- ),
- "encryption": _get_or_none(encryption_id, "algorithm"),
- "name": _get_or_none(name_id, "name"),
- "topic": _get_or_none(topic_id, "topic"),
- "avatar": _get_or_none(avatar_id, "url"),
- "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
- },
- )
+ for room_id in rooms_to_work_on:
+ yield self._calculate_and_set_initial_state_for_room(room_id)
+ progress["last_room_id"] = room_id
- now = self.hs.get_reactor().seconds()
-
- # quantise time to the nearest bucket
- now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
- def _fetch_data(txn):
-
- # Get the current token of the room
- current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
- current_state_events = len(current_state_ids)
-
- membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
-
- total_state_events = self._get_total_state_event_counts_txn(
- txn, room_id
- )
-
- self._update_stats_txn(
- txn,
- "room",
- room_id,
- now,
- {
- "bucket_size": self.stats_bucket_size,
- "current_state_events": current_state_events,
- "joined_members": membership_counts.get(Membership.JOIN, 0),
- "invited_members": membership_counts.get(Membership.INVITE, 0),
- "left_members": membership_counts.get(Membership.LEAVE, 0),
- "banned_members": membership_counts.get(Membership.BAN, 0),
- "state_events": total_state_events,
- },
- )
- self._simple_insert_txn(
- txn,
- "room_stats_earliest_token",
- {"room_id": room_id, "token": current_token},
- )
-
- # We've finished a room. Delete it from the table.
- self._simple_delete_one_txn(
- txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
- )
-
- yield self.runInteraction("update_room_stats", _fetch_data)
-
- # Update the remaining counter.
- progress["remaining"] -= 1
- yield self.runInteraction(
- "populate_stats",
- self._background_update_progress_txn,
- "populate_stats_process_rooms",
- progress,
- )
-
- processed_event_count += event_count
-
- if processed_event_count > batch_size:
- # Don't process any more rooms, we've hit our batch size.
- return processed_event_count
+ yield self.runInteraction(
+ "_populate_stats_process_rooms",
+ self._background_update_progress_txn,
+ "populate_stats_process_rooms",
+ progress,
+ )
- return processed_event_count
+ return len(rooms_to_work_on)
- def delete_all_stats(self):
+ def get_stats_positions(self):
"""
- Delete all statistics records.
+ Returns the stats processor positions.
"""
-
- def _delete_all_stats_txn(txn):
- txn.execute("DELETE FROM room_state")
- txn.execute("DELETE FROM room_stats")
- txn.execute("DELETE FROM room_stats_earliest_token")
- txn.execute("DELETE FROM user_stats")
-
- return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
-
- def get_stats_stream_pos(self):
return self._simple_select_one_onecol(
- table="stats_stream_pos",
+ table="stats_incremental_position",
keyvalues={},
retcol="stream_id",
- desc="stats_stream_pos",
- )
-
- def update_stats_stream_pos(self, stream_id):
- return self._simple_update_one(
- table="stats_stream_pos",
- keyvalues={},
- updatevalues={"stream_id": stream_id},
- desc="update_stats_stream_pos",
+ desc="stats_incremental_position",
)
def update_room_state(self, room_id, fields):
@@ -361,42 +216,87 @@ class StatsStore(StateDeltasStore):
fields[col] = None
return self._simple_upsert(
- table="room_state",
+ table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
- def get_deltas_for_room(self, room_id, start, size=100):
+ def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
"""
- Get statistics deltas for a given room.
+ Get statistics for a given subject.
Args:
- room_id (str)
+ stats_type (str): The type of subject
+ stats_id (str): The ID of the subject (e.g. room_id or user_id)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
- ABSOLUTE_STATS_FIELDS["room"] and "ts".
+ ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
- return self._simple_select_list_paginate(
- "room_stats",
- {"room_id": room_id},
- "ts",
+ return self.runInteraction(
+ "get_statistics_for_subject",
+ self._get_statistics_for_subject_txn,
+ stats_type,
+ stats_id,
+ start,
+ size,
+ )
+
+ def _get_statistics_for_subject_txn(
+ self, txn, stats_type, stats_id, start, size=100
+ ):
+ """
+ Transaction-bound version of L{get_statistics_for_subject}.
+ """
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+ selected_columns = list(
+ ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+ )
+
+ slice_list = self._simple_select_list_paginate_txn(
+ txn,
+ table + "_historical",
+ {id_col: stats_id},
+ "end_ts",
start,
size,
- retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+ retcols=selected_columns + ["bucket_size", "end_ts"],
order_direction="DESC",
)
- def get_all_room_state(self):
- return self._simple_select_list(
- "room_state", None, retcols=("name", "topic", "canonical_alias")
+ return slice_list
+
+ def get_room_stats_state(self, room_id):
+ """
+ Returns the current room_stats_state for a room.
+
+ Args:
+ room_id (str): The ID of the room to return state for.
+
+ Returns (dict):
+ Dictionary containing these keys:
+ "name", "topic", "canonical_alias", "avatar", "join_rules",
+ "history_visibility"
+ """
+ return self._simple_select_one(
+ "room_stats_state",
+ {"room_id": room_id},
+ retcols=(
+ "name",
+ "topic",
+ "canonical_alias",
+ "avatar",
+ "join_rules",
+ "history_visibility",
+ ),
)
@cached()
- def get_earliest_token_for_room_stats(self, room_id):
+ def get_earliest_token_for_stats(self, stats_type, id):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
@@ -406,79 +306,571 @@ class StatsStore(StateDeltasStore):
Returns:
Deferred[int]
"""
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
return self._simple_select_one_onecol(
- "room_stats_earliest_token",
- {"room_id": room_id},
- retcol="token",
+ "%s_current" % (table,),
+ keyvalues={id_col: id},
+ retcol="completed_delta_stream_id",
allow_none=True,
)
- def update_stats(self, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert(
- table=table,
- keyvalues={id_col: stats_id, "ts": ts},
- values=fields,
- desc="update_stats",
+ def bulk_update_stats_delta(self, ts, updates, stream_id):
+ """Bulk update stats tables for a given stream_id and updates the stats
+ incremental position.
+
+ Args:
+ ts (int): Current timestamp in ms
+ updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
+ commit as a mapping stats_type -> stats_id -> field -> delta.
+ stream_id (int): Current position.
+
+ Returns:
+ Deferred
+ """
+
+ def _bulk_update_stats_delta_txn(txn):
+ for stats_type, stats_updates in updates.items():
+ for stats_id, fields in stats_updates.items():
+ self._update_stats_delta_txn(
+ txn,
+ ts=ts,
+ stats_type=stats_type,
+ stats_id=stats_id,
+ fields=fields,
+ complete_with_stream_id=stream_id,
+ )
+
+ self._simple_update_one_txn(
+ txn,
+ table="stats_incremental_position",
+ keyvalues={},
+ updatevalues={"stream_id": stream_id},
+ )
+
+ return self.runInteraction(
+ "bulk_update_stats_delta", _bulk_update_stats_delta_txn
)
- def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert_txn(
- txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+ def update_stats_delta(
+ self,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id,
+ absolute_field_overrides=None,
+ ):
+ """
+ Updates the statistics for a subject, with a delta (difference/relative
+ change).
+
+ Args:
+ ts (int): timestamp of the change
+ stats_type (str): "room" or "user" ā the kind of subject
+ stats_id (str): the subject's ID (room ID or user ID)
+ fields (dict[str, int]): Deltas of stats values.
+ complete_with_stream_id (int, optional):
+ If supplied, converts an incomplete row into a complete row,
+ with the supplied stream_id marked as the stream_id where the
+ row was completed.
+ absolute_field_overrides (dict[str, int]): Current stats values
+ (i.e. not deltas) of absolute fields.
+ Does not work with per-slice fields.
+ """
+
+ return self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
+ absolute_field_overrides=absolute_field_overrides,
)
- def update_stats_delta(self, ts, stats_type, stats_id, field, value):
- def _update_stats_delta(txn):
- table, id_col = TYPE_TO_ROOM[stats_type]
-
- sql = (
- "SELECT * FROM %s"
- " WHERE %s=? and ts=("
- " SELECT MAX(ts) FROM %s"
- " WHERE %s=?"
- ")"
- ) % (table, id_col, table, id_col)
- txn.execute(sql, (stats_id, stats_id))
- rows = self.cursor_to_dict(txn)
- if len(rows) == 0:
- # silently skip as we don't have anything to apply a delta to yet.
- # this tries to minimise any race between the initial sync and
- # subsequent deltas arriving.
- return
-
- current_ts = ts
- latest_ts = rows[0]["ts"]
- if current_ts < latest_ts:
- # This one is in the past, but we're just encountering it now.
- # Mark it as part of the current bucket.
- current_ts = latest_ts
- elif ts != latest_ts:
- # we have to copy our absolute counters over to the new entry.
- values = {
- key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
- }
- values[id_col] = stats_id
- values["ts"] = ts
- values["bucket_size"] = self.stats_bucket_size
-
- self._simple_insert_txn(txn, table=table, values=values)
-
- # actually update the new value
- if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
- self._simple_update_txn(
- txn,
- table=table,
- keyvalues={id_col: stats_id, "ts": current_ts},
- updatevalues={field: value},
+ def _update_stats_delta_txn(
+ self,
+ txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id,
+ absolute_field_overrides=None,
+ ):
+ if absolute_field_overrides is None:
+ absolute_field_overrides = {}
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ quantised_ts = self.quantise_stats_time(int(ts))
+ end_ts = quantised_ts + self.stats_bucket_size
+
+ # Lets be paranoid and check that all the given field names are known
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
+ for field in chain(fields.keys(), absolute_field_overrides.keys()):
+ if field not in abs_field_names and field not in slice_field_names:
+ # guard against potential SQL injection dodginess
+ raise ValueError(
+ "%s is not a recognised field"
+ " for stats type %s" % (field, stats_type)
)
+
+ # Per slice fields do not get added to the _current table
+
+ # This calculates the deltas (`field = field + ?` values)
+ # for absolute fields,
+ # * defaulting to 0 if not specified
+ # (required for the INSERT part of upserting to work)
+ # * omitting overrides specified in `absolute_field_overrides`
+ deltas_of_absolute_fields = {
+ key: fields.get(key, 0)
+ for key in abs_field_names
+ if key not in absolute_field_overrides
+ }
+
+ # Keep the delta stream ID field up to date
+ absolute_field_overrides = absolute_field_overrides.copy()
+ absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id
+
+ # first upsert the `_current` table
+ self._upsert_with_additive_relatives_txn(
+ txn=txn,
+ table=table + "_current",
+ keyvalues={id_col: stats_id},
+ absolutes=absolute_field_overrides,
+ additive_relatives=deltas_of_absolute_fields,
+ )
+
+ per_slice_additive_relatives = {
+ key: fields.get(key, 0) for key in slice_field_names
+ }
+ self._upsert_copy_from_table_with_additive_relatives_txn(
+ txn=txn,
+ into_table=table + "_historical",
+ keyvalues={id_col: stats_id},
+ extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
+ extra_dst_keyvalues={"end_ts": end_ts},
+ additive_relatives=per_slice_additive_relatives,
+ src_table=table + "_current",
+ copy_columns=abs_field_names,
+ )
+
+ def _upsert_with_additive_relatives_txn(
+ self, txn, table, keyvalues, absolutes, additive_relatives
+ ):
+ """Used to update values in the stats tables.
+
+ This is basically a slightly convoluted upsert that *adds* to any
+ existing rows.
+
+ Args:
+ txn
+ table (str): Table name
+ keyvalues (dict[str, any]): Row-identifying key values
+ absolutes (dict[str, any]): Absolute (set) fields
+ additive_relatives (dict[str, int]): Fields that will be added onto
+ if existing row present.
+ """
+ if self.database_engine.can_native_upsert:
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = []
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "key_columns": ", ".join(keyvalues),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
+
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, table)
+ retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
+ current_row = self._simple_select_one_txn(
+ txn, table, keyvalues, retcols, allow_none=True
+ )
+ if current_row is None:
+ merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+ self._simple_insert_txn(txn, table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ current_row[key] += val
+ current_row.update(absolutes)
+ self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+ def _upsert_copy_from_table_with_additive_relatives_txn(
+ self,
+ txn,
+ into_table,
+ keyvalues,
+ extra_dst_keyvalues,
+ extra_dst_insvalues,
+ additive_relatives,
+ src_table,
+ copy_columns,
+ ):
+ """Updates the historic stats table with latest updates.
+
+ This involves copying "absolute" fields from the `_current` table, and
+ adding relative fields to any existing values.
+
+ Args:
+ txn: Transaction
+ into_table (str): The destination table to UPSERT the row into
+ keyvalues (dict[str, any]): Row-identifying key values
+ extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+ for `into_table`.
+ extra_dst_insvalues (dict[str, any]): Additional values to insert
+ on new row creation for `into_table`.
+ additive_relatives (dict[str, any]): Fields that will be added onto
+ if existing row present. (Must be disjoint from copy_columns.)
+ src_table (str): The source table to copy from
+ copy_columns (iterable[str]): The list of columns to copy
+ """
+ if self.database_engine.can_native_upsert:
+ ins_columns = chain(
+ keyvalues,
+ copy_columns,
+ additive_relatives,
+ extra_dst_keyvalues,
+ extra_dst_insvalues,
+ )
+ sel_exprs = chain(
+ keyvalues,
+ copy_columns,
+ (
+ "?"
+ for _ in chain(
+ additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
+ )
+ ),
+ )
+ keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+ sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+ sets_ar = (
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
+ for f in additive_relatives
+ )
+
+ sql = """
+ INSERT INTO %(into_table)s (%(ins_columns)s)
+ SELECT %(sel_exprs)s
+ FROM %(src_table)s
+ WHERE %(keyvalues_where)s
+ ON CONFLICT (%(keyvalues)s)
+ DO UPDATE SET %(sets)s
+ """ % {
+ "into_table": into_table,
+ "ins_columns": ", ".join(ins_columns),
+ "sel_exprs": ", ".join(sel_exprs),
+ "keyvalues_where": " AND ".join(keyvalues_where),
+ "src_table": src_table,
+ "keyvalues": ", ".join(
+ chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+ ),
+ "sets": ", ".join(chain(sets_cc, sets_ar)),
+ }
+
+ qargs = list(
+ chain(
+ additive_relatives.values(),
+ extra_dst_keyvalues.values(),
+ extra_dst_insvalues.values(),
+ keyvalues.values(),
+ )
+ )
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, into_table)
+ src_row = self._simple_select_one_txn(
+ txn, src_table, keyvalues, copy_columns
+ )
+ all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
+ dest_current_row = self._simple_select_one_txn(
+ txn,
+ into_table,
+ keyvalues=all_dest_keyvalues,
+ retcols=list(chain(additive_relatives.keys(), copy_columns)),
+ allow_none=True,
+ )
+
+ if dest_current_row is None:
+ merged_dict = {
+ **keyvalues,
+ **extra_dst_keyvalues,
+ **extra_dst_insvalues,
+ **src_row,
+ **additive_relatives,
+ }
+ self._simple_insert_txn(txn, into_table, merged_dict)
else:
- sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
- table,
- field,
- field,
- id_col,
+ for (key, val) in additive_relatives.items():
+ src_row[key] = dest_current_row[key] + val
+ self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
+
+ def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
+ """Fetches the counts of events in the given range of stream IDs.
+
+ Args:
+ min_pos (int)
+ max_pos (int)
+
+ Returns:
+ Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
+ changes.
+ """
+
+ return self.runInteraction(
+ "stats_incremental_total_events_and_bytes",
+ self.get_changes_room_total_events_and_bytes_txn,
+ min_pos,
+ max_pos,
+ )
+
+ def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
+ """Gets the total_events and total_event_bytes counts for rooms and
+ senders, in a range of stream_orderings (including backfilled events).
+
+ Args:
+ txn
+ low_pos (int): Low stream ordering
+ high_pos (int): High stream ordering
+
+ Returns:
+ tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
+ room and user deltas for total_events/total_event_bytes in the
+ format of `stats_id` -> fields
+ """
+
+ if low_pos >= high_pos:
+ # nothing to do here.
+ return {}, {}
+
+ if isinstance(self.database_engine, PostgresEngine):
+ new_bytes_expression = "OCTET_LENGTH(json)"
+ else:
+ new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+ sql = """
+ SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ FROM events INNER JOIN event_json USING (event_id)
+ WHERE (? < stream_ordering AND stream_ordering <= ?)
+ OR (? <= stream_ordering AND stream_ordering <= ?)
+ GROUP BY events.room_id
+ """ % (
+ new_bytes_expression,
+ )
+
+ txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
+
+ room_deltas = {
+ room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
+ for room_id, new_events, new_bytes in txn
+ }
+
+ sql = """
+ SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ FROM events INNER JOIN event_json USING (event_id)
+ WHERE (? < stream_ordering AND stream_ordering <= ?)
+ OR (? <= stream_ordering AND stream_ordering <= ?)
+ GROUP BY events.sender
+ """ % (
+ new_bytes_expression,
+ )
+
+ txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
+
+ user_deltas = {
+ user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
+ for user_id, new_events, new_bytes in txn
+ if self.hs.is_mine_id(user_id)
+ }
+
+ return room_deltas, user_deltas
+
+ @defer.inlineCallbacks
+ def _calculate_and_set_initial_state_for_room(self, room_id):
+ """Calculate and insert an entry into room_stats_current.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[tuple[dict, dict, int]]: A tuple of room state, membership
+ counts and stream position.
+ """
+
+ def _fetch_current_state_stats(txn):
+ pos = self.get_room_max_stream_ordering()
+
+ rows = self._simple_select_many_txn(
+ txn,
+ table="current_state_events",
+ column="type",
+ iterable=[
+ EventTypes.Create,
+ EventTypes.JoinRules,
+ EventTypes.RoomHistoryVisibility,
+ EventTypes.Encryption,
+ EventTypes.Name,
+ EventTypes.Topic,
+ EventTypes.RoomAvatar,
+ EventTypes.CanonicalAlias,
+ ],
+ keyvalues={"room_id": room_id, "state_key": ""},
+ retcols=["event_id"],
+ )
+
+ event_ids = [row["event_id"] for row in rows]
+
+ txn.execute(
+ """
+ SELECT membership, count(*) FROM current_state_events
+ WHERE room_id = ? AND type = 'm.room.member'
+ GROUP BY membership
+ """,
+ (room_id,),
+ )
+ membership_counts = {membership: cnt for membership, cnt in txn}
+
+ txn.execute(
+ """
+ SELECT COALESCE(count(*), 0) FROM current_state_events
+ WHERE room_id = ?
+ """,
+ (room_id,),
+ )
+
+ current_state_events_count, = txn.fetchone()
+
+ users_in_room = self.get_users_in_room_txn(txn, room_id)
+
+ return (
+ event_ids,
+ membership_counts,
+ current_state_events_count,
+ users_in_room,
+ pos,
+ )
+
+ (
+ event_ids,
+ membership_counts,
+ current_state_events_count,
+ users_in_room,
+ pos,
+ ) = yield self.runInteraction(
+ "get_initial_state_for_room", _fetch_current_state_stats
+ )
+
+ state_event_map = yield self.get_events(event_ids, get_prev_content=False)
+
+ room_state = {
+ "join_rules": None,
+ "history_visibility": None,
+ "encryption": None,
+ "name": None,
+ "topic": None,
+ "avatar": None,
+ "canonical_alias": None,
+ "is_federatable": True,
+ }
+
+ for event in state_event_map.values():
+ if event.type == EventTypes.JoinRules:
+ room_state["join_rules"] = event.content.get("join_rule")
+ elif event.type == EventTypes.RoomHistoryVisibility:
+ room_state["history_visibility"] = event.content.get(
+ "history_visibility"
)
- txn.execute(sql, (value, stats_id, current_ts))
+ elif event.type == EventTypes.Encryption:
+ room_state["encryption"] = event.content.get("algorithm")
+ elif event.type == EventTypes.Name:
+ room_state["name"] = event.content.get("name")
+ elif event.type == EventTypes.Topic:
+ room_state["topic"] = event.content.get("topic")
+ elif event.type == EventTypes.RoomAvatar:
+ room_state["avatar"] = event.content.get("url")
+ elif event.type == EventTypes.CanonicalAlias:
+ room_state["canonical_alias"] = event.content.get("alias")
+ elif event.type == EventTypes.Create:
+ room_state["is_federatable"] = event.content.get("m.federate", True)
+
+ yield self.update_room_state(room_id, room_state)
+
+ local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
+
+ yield self.update_stats_delta(
+ ts=self.clock.time_msec(),
+ stats_type="room",
+ stats_id=room_id,
+ fields={},
+ complete_with_stream_id=pos,
+ absolute_field_overrides={
+ "current_state_events": current_state_events_count,
+ "joined_members": membership_counts.get(Membership.JOIN, 0),
+ "invited_members": membership_counts.get(Membership.INVITE, 0),
+ "left_members": membership_counts.get(Membership.LEAVE, 0),
+ "banned_members": membership_counts.get(Membership.BAN, 0),
+ "local_users_in_room": len(local_users_in_room),
+ },
+ )
+
+ @defer.inlineCallbacks
+ def _calculate_and_set_initial_state_for_user(self, user_id):
+ def _calculate_and_set_initial_state_for_user_txn(txn):
+ pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
- return self.runInteraction("update_stats_delta", _update_stats_delta)
+ txn.execute(
+ """
+ SELECT COUNT(distinct room_id) FROM current_state_events
+ WHERE type = 'm.room.member' AND state_key = ?
+ AND membership = 'join'
+ """,
+ (user_id,),
+ )
+ count, = txn.fetchone()
+ return count, pos
+
+ joined_rooms, pos = yield self.runInteraction(
+ "calculate_and_set_initial_state_for_user",
+ _calculate_and_set_initial_state_for_user_txn,
+ )
+
+ yield self.update_stats_delta(
+ ts=self.clock.time_msec(),
+ stats_type="user",
+ stats_id=user_id,
+ fields={},
+ complete_with_stream_id=pos,
+ absolute_field_overrides={"joined_rooms": joined_rooms},
+ )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 856c2ee8d8..490454f19a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -364,7 +364,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
the chunk of events returned.
"""
if from_key == to_key:
- return ([], from_key)
+ return [], from_key
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
@@ -374,7 +374,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
if not has_changed:
- return ([], from_key)
+ return [], from_key
def f(txn):
sql = (
@@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# get.
key = from_key
- return (ret, key)
+ return ret, key
@defer.inlineCallbacks
def get_membership_changes_for_user(self, user_id, from_key, to_key):
@@ -496,7 +496,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
# Allow a zero limit here, and no-op.
if limit == 0:
- return ([], end_token)
+ return [], end_token
end_token = RoomStreamToken.parse(end_token)
@@ -511,7 +511,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# We want to return the results in ascending order.
rows.reverse()
- return (rows, token)
+ return rows, token
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
@@ -783,7 +783,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
events = yield self.get_events_as_list(event_ids)
- return (upper_bound, events)
+ return upper_bound, events
def get_federation_out_pos(self, typ):
return self._simple_select_one_onecol(
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index f1c8d99419..cbb0a4810a 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -195,6 +195,6 @@ class ChainedIdGenerator(object):
with self._lock:
if self._unfinished_ids:
stream_id, chained_id = self._unfinished_ids[0]
- return (stream_id - 1, chained_id)
+ return stream_id - 1, chained_id
- return (self._current_max, self.chained_generator.get_current_token())
+ return self._current_max, self.chained_generator.get_current_token()
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index f7f5906a99..02994ab2a5 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -37,7 +37,7 @@ class SourcePaginationConfig(object):
self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None
def __repr__(self):
- return ("StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)") % (
+ return "StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)" % (
self.from_key,
self.to_key,
self.direction,
diff --git a/synapse/util/hash.py b/synapse/util/hash.py
deleted file mode 100644
index 359168704e..0000000000
--- a/synapse/util/hash.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import hashlib
-
-import unpaddedbase64
-
-
-def sha256_and_url_safe_base64(input_text):
- """SHA256 hash an input string, encode the digest as url-safe base64, and
- return
-
- :param input_text: string to hash
- :type input_text: str
-
- :returns a sha256 hashed and url-safe base64 encoded digest
- :rtype: str
- """
- digest = hashlib.sha256(input_text.encode()).digest()
- return unpaddedbase64.encode_base64(digest, urlsafe=True)
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index 0ad0a88165..e10296a5e4 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -283,4 +283,4 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
user, requester, displayname, by_admin=True
)
- return (user_id, token)
+ return user_id, token
diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py
index a8b858eb4f..7569b6fab5 100644
--- a/tests/handlers/test_stats.py
+++ b/tests/handlers/test_stats.py
@@ -13,16 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from mock import Mock
-
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, Membership
+from synapse import storage
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
+# The expected number of state events in a fresh public room.
+EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM = 5
+# The expected number of state events in a fresh private room.
+EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM = 6
+
class StatsRoomTests(unittest.HomeserverTestCase):
@@ -33,7 +34,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
-
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
@@ -47,7 +47,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.get_success(
self.store._simple_insert(
"background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+ {"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
@@ -56,7 +56,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
- "depends_on": "populate_stats_createtables",
+ "depends_on": "populate_stats_prepare",
},
)
)
@@ -64,18 +64,58 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.store._simple_insert(
"background_updates",
{
- "update_name": "populate_stats_cleanup",
+ "update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_cleanup",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_process_users",
+ },
+ )
+ )
+
+ def get_all_room_state(self):
+ return self.store._simple_select_list(
+ "room_stats_state", None, retcols=("name", "topic", "canonical_alias")
+ )
+
+ def _get_current_stats(self, stats_type, stat_id):
+ table, id_col = storage.stats.TYPE_TO_TABLE[stats_type]
+
+ cols = list(storage.stats.ABSOLUTE_STATS_FIELDS[stats_type]) + list(
+ storage.stats.PER_SLICE_FIELDS[stats_type]
+ )
+
+ end_ts = self.store.quantise_stats_time(self.reactor.seconds() * 1000)
+
+ return self.get_success(
+ self.store._simple_select_one(
+ table + "_historical",
+ {id_col: stat_id, end_ts: end_ts},
+ cols,
+ allow_none=True,
+ )
+ )
+
+ def _perform_background_initial_update(self):
+ # Do the initial population of the stats via the background update
+ self._add_background_updates()
+
+ while not self.get_success(self.store.has_completed_background_updates()):
+ self.get_success(self.store.do_next_background_update(100), by=0.1)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
"""
- r = self.get_success(self.store.get_all_room_state())
+ r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 0)
# Disable stats
@@ -91,7 +131,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
)
# Stats disabled, shouldn't have done anything
- r = self.get_success(self.store.get_all_room_state())
+ r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 0)
# Enable stats
@@ -104,7 +144,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
- r = self.get_success(self.store.get_all_room_state())
+ r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["topic"], "foo")
@@ -114,6 +154,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
+
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
@@ -138,12 +179,18 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
- self.get_success(self.store.update_stats_stream_pos(None))
+ self.get_success(
+ self.store._simple_update_one(
+ table="stats_incremental_position",
+ keyvalues={},
+ updatevalues={"stream_id": 0},
+ )
+ )
self.get_success(
self.store._simple_insert(
"background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+ {"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
@@ -154,6 +201,8 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
+ # orig_delta_processor = self.store.
+
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
@@ -185,8 +234,15 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
- # Get the deltas! There should be two -- day 1, and day 2.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
+ # self.handler.notify_new_event()
+
+ # We need to let the delta processor advanceā¦
+ self.pump(10 * 60)
+
+ # Get the slices! There should be two -- day 1, and day 2.
+ r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
+
+ self.assertEqual(len(r), 2)
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
@@ -194,111 +250,476 @@ class StatsRoomTests(unittest.HomeserverTestCase):
# The newest has 3
self.assertEqual(r[0]["joined_members"], 3)
- def test_incorrect_state_transition(self):
- """
- If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
- (JOIN, INVITE, LEAVE, BAN), an error is raised.
- """
- events = {
- "a1": {"membership": Membership.LEAVE},
- "a2": {"membership": "not a real thing"},
- }
-
- def get_event(event_id, allow_none=True):
- m = Mock()
- m.content = events[event_id]
- d = defer.Deferred()
- self.reactor.callLater(0.0, d.callback, m)
- return d
-
- def get_received_ts(event_id):
- return defer.succeed(1)
-
- self.store.get_received_ts = get_received_ts
- self.store.get_event = get_event
-
- deltas = [
- {
- "type": EventTypes.Member,
- "state_key": "some_user",
- "room_id": "room",
- "event_id": "a1",
- "prev_event_id": "a2",
- "stream_id": 60,
- }
- ]
-
- f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
+ def test_create_user(self):
+ """
+ When we create a user, it should have statistics already ready.
+ """
+
+ u1 = self.register_user("u1", "pass")
+
+ u1stats = self._get_current_stats("user", u1)
+
+ self.assertIsNotNone(u1stats)
+
+ # not in any rooms by default
+ self.assertEqual(u1stats["joined_rooms"], 0)
+
+ def test_create_room(self):
+ """
+ When we create a room, it should have statistics already ready.
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+ r1stats = self._get_current_stats("room", r1)
+ r2 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
+ r2stats = self._get_current_stats("room", r2)
+
+ self.assertIsNotNone(r1stats)
+ self.assertIsNotNone(r2stats)
+
+ # contains the default things you'd expect in a fresh room
self.assertEqual(
- f.value.args[0], "'not a real thing' is not a valid prev_membership"
- )
-
- # And the other way...
- deltas = [
- {
- "type": EventTypes.Member,
- "state_key": "some_user",
- "room_id": "room",
- "event_id": "a2",
- "prev_event_id": "a1",
- "stream_id": 100,
- }
- ]
-
- f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
+ r1stats["total_events"],
+ EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM,
+ "Wrong number of total_events in new room's stats!"
+ " You may need to update this if more state events are added to"
+ " the room creation process.",
+ )
self.assertEqual(
- f.value.args[0], "'not a real thing' is not a valid membership"
+ r2stats["total_events"],
+ EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
+ "Wrong number of total_events in new room's stats!"
+ " You may need to update this if more state events are added to"
+ " the room creation process.",
)
- def test_redacted_prev_event(self):
+ self.assertEqual(
+ r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
+ )
+ self.assertEqual(
+ r2stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM
+ )
+
+ self.assertEqual(r1stats["joined_members"], 1)
+ self.assertEqual(r1stats["invited_members"], 0)
+ self.assertEqual(r1stats["banned_members"], 0)
+
+ self.assertEqual(r2stats["joined_members"], 1)
+ self.assertEqual(r2stats["invited_members"], 0)
+ self.assertEqual(r2stats["banned_members"], 0)
+
+ def test_send_message_increments_total_events(self):
"""
- If the prev_event does not exist, then it is assumed to be a LEAVE.
+ When we send a message, it increments total_events.
"""
+
+ self._perform_background_initial_update()
+
u1 = self.register_user("u1", "pass")
- u1_token = self.login("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+ r1stats_ante = self._get_current_stats("room", r1)
- room_1 = self.helper.create_room_as(u1, tok=u1_token)
+ self.helper.send(r1, "hiss", tok=u1token)
- # Do the initial population of the user directory via the background update
- self._add_background_updates()
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+
+ def test_send_state_event_nonoverwriting(self):
+ """
+ When we send a non-overwriting state event, it increments total_events AND current_state_events
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
+ )
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": False}, tok=u1token, state_key="moggy"
+ )
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 1,
+ )
+
+ def test_send_state_event_overwriting(self):
+ """
+ When we send an overwriting state event, it increments total_events ONLY
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
+ )
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby"
+ )
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+
+ def test_join_first_time(self):
+ """
+ When a user joins a room for the first time, total_events, current_state_events and
+ joined_members should increase by exactly 1.
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 1,
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], 1
+ )
+
+ def test_join_after_leave(self):
+ """
+ When a user joins a room after being previously left, total_events and
+ joined_members should increase by exactly 1.
+ current_state_events should not increase.
+ left_members should decrease by exactly 1.
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.join(r1, u2, tok=u2token)
+ self.helper.leave(r1, u2, tok=u2token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["left_members"] - r1stats_ante["left_members"], -1
+ )
+
+ def test_invited(self):
+ """
+ When a user invites another user, current_state_events, total_events and
+ invited_members should increase by exactly 1.
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.invite(r1, u1, u2, tok=u1token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 1,
+ )
+ self.assertEqual(
+ r1stats_post["invited_members"] - r1stats_ante["invited_members"], +1
+ )
+
+ def test_join_after_invite(self):
+ """
+ When a user joins a room after being invited, total_events and
+ joined_members should increase by exactly 1.
+ current_state_events should not increase.
+ invited_members should decrease by exactly 1.
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.invite(r1, u1, u2, tok=u1token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["invited_members"] - r1stats_ante["invited_members"], -1
+ )
+
+ def test_left(self):
+ """
+ When a user leaves a room after joining, total_events and
+ left_members should increase by exactly 1.
+ current_state_events should not increase.
+ joined_members should decrease by exactly 1.
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.leave(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["left_members"] - r1stats_ante["left_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
+ )
+
+ def test_banned(self):
+ """
+ When a user is banned from a room after joining, total_events and
+ left_members should increase by exactly 1.
+ current_state_events should not increase.
+ banned_members should decrease by exactly 1.
+ """
+
+ self._perform_background_initial_update()
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.change_membership(r1, u1, u2, "ban", tok=u1token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["banned_members"] - r1stats_ante["banned_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
+ )
+
+ def test_initial_background_update(self):
+ """
+ Test that statistics can be generated by the initial background update
+ handler.
+
+ This test also tests that stats rows are not created for new subjects
+ when stats are disabled. However, it may be desirable to change this
+ behaviour eventually to still keep current rows.
+ """
+
+ self.hs.config.stats_enabled = False
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ # test that these subjects, which were created during a time of disabled
+ # stats, do not have stats.
+ self.assertIsNone(self._get_current_stats("room", r1))
+ self.assertIsNone(self._get_current_stats("user", u1))
+
+ self.hs.config.stats_enabled = True
+
+ self._perform_background_initial_update()
+
+ r1stats = self._get_current_stats("room", r1)
+ u1stats = self._get_current_stats("user", u1)
+
+ self.assertEqual(r1stats["joined_members"], 1)
+ self.assertEqual(
+ r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
+ )
+
+ self.assertEqual(u1stats["joined_rooms"], 1)
+
+ def test_incomplete_stats(self):
+ """
+ This tests that we track incomplete statistics.
+
+ We first test that incomplete stats are incrementally generated,
+ following the preparation of a background regen.
+
+ We then test that these incomplete rows are completed by the background
+ regen.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+ u3 = self.register_user("u3", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
+
+ # preparation stage of the initial background update
+ # Ugh, have to reset this flag
+ self.store._all_done = False
+
+ self.get_success(
+ self.store._simple_delete(
+ "room_stats_current", {"1": 1}, "test_delete_stats"
+ )
+ )
+ self.get_success(
+ self.store._simple_delete(
+ "user_stats_current", {"1": 1}, "test_delete_stats"
+ )
+ )
+
+ self.helper.invite(r1, u1, u2, tok=u1token)
+ self.helper.join(r1, u2, tok=u2token)
+ self.helper.invite(r1, u1, u3, tok=u1token)
+ self.helper.send(r1, "thou shalt yield", tok=u1token)
+
+ # now do the background updates
+
+ self.store._all_done = False
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_process_rooms",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_prepare",
+ },
+ )
+ )
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_process_users",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_process_rooms",
+ },
+ )
+ )
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_cleanup",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_process_users",
+ },
+ )
+ )
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
- events = {"a1": None, "a2": {"membership": Membership.JOIN}}
-
- def get_event(event_id, allow_none=True):
- if events.get(event_id):
- m = Mock()
- m.content = events[event_id]
- else:
- m = None
- d = defer.Deferred()
- self.reactor.callLater(0.0, d.callback, m)
- return d
-
- def get_received_ts(event_id):
- return defer.succeed(1)
-
- self.store.get_received_ts = get_received_ts
- self.store.get_event = get_event
-
- deltas = [
- {
- "type": EventTypes.Member,
- "state_key": "some_user:test",
- "room_id": room_1,
- "event_id": "a2",
- "prev_event_id": "a1",
- "stream_id": 100,
- }
- ]
-
- # Handle our fake deltas, which has a user going from LEAVE -> JOIN.
- self.get_success(self.handler._handle_deltas(deltas))
-
- # One delta, with two joined members -- the room creator, and our fake
- # user.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
- self.assertEqual(len(r), 1)
- self.assertEqual(r[0]["joined_members"], 2)
+ r1stats_complete = self._get_current_stats("room", r1)
+ u1stats_complete = self._get_current_stats("user", u1)
+ u2stats_complete = self._get_current_stats("user", u2)
+
+ # now we make our assertions
+
+ # check that _complete rows are complete and correct
+ self.assertEqual(r1stats_complete["joined_members"], 2)
+ self.assertEqual(r1stats_complete["invited_members"], 1)
+
+ self.assertEqual(
+ r1stats_complete["current_state_events"],
+ 2 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
+ )
+
+ self.assertEqual(u1stats_complete["joined_rooms"], 1)
+ self.assertEqual(u2stats_complete["joined_rooms"], 1)
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index 9915367144..cdded88b7f 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -128,8 +128,12 @@ class RestHelper(object):
return channel.json_body
- def send_state(self, room_id, event_type, body, tok, expect_code=200):
- path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
+ def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""):
+ path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % (
+ room_id,
+ event_type,
+ state_key,
+ )
if tok:
path = path + "?access_token=%s" % tok
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index bb867150f4..ab4d7d70d0 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -472,7 +472,7 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase):
added_at=now,
)
)
- return (user_id, tok)
+ return user_id, tok
def test_manual_email_send_expired_account(self):
user_id = self.register_user("kermit", "monkey")
diff --git a/tests/server.py b/tests/server.py
index c8269619b1..e397ebe8fa 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -338,7 +338,7 @@ def setup_test_homeserver(cleanup_func, *args, **kwargs):
def get_clock():
clock = ThreadedMemoryReactorClock()
hs_clock = Clock(clock)
- return (clock, hs_clock)
+ return clock, hs_clock
@attr.s(cmp=False)
diff --git a/tests/test_server.py b/tests/test_server.py
index 2a7d407c98..98fef21d55 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -57,7 +57,7 @@ class JsonResourceTests(unittest.TestCase):
def _callback(request, **kwargs):
got_kwargs.update(kwargs)
- return (200, kwargs)
+ return 200, kwargs
res = JsonResource(self.homeserver)
res.register_paths(
diff --git a/tests/test_state.py b/tests/test_state.py
index 6d33566f47..610ec9fb46 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -106,7 +106,7 @@ class StateGroupStore(object):
}
def get_state_group_delta(self, name):
- return (None, None)
+ return None, None
def register_events(self, events):
for e in events:
diff --git a/tests/utils.py b/tests/utils.py
index f1eb9a545c..46ef2959f2 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -464,7 +464,7 @@ class MockHttpResource(HttpServer):
args = [urlparse.unquote(u) for u in matcher.groups()]
(code, response) = yield func(mock_request, *args)
- return (code, response)
+ return code, response
except CodeMessageException as e:
return (e.code, cs_error(e.msg, code=e.errcode))
diff --git a/tox.ini b/tox.ini
index f9a3b7e49a..7cb40847b5 100644
--- a/tox.ini
+++ b/tox.ini
@@ -7,6 +7,7 @@ deps =
python-subunit
junitxml
coverage
+ coverage-enable-subprocess
parameterized
# cyptography 2.2 requires setuptools >= 18.5
@@ -43,13 +44,13 @@ whitelist_externals =
setenv =
{[base]setenv}
postgres: SYNAPSE_POSTGRES = 1
+ TOP={toxinidir}
passenv = *
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
# Add this so that coverage will run on subprocesses
- sh -c 'echo "import coverage; coverage.process_startup()" > {envsitepackagesdir}/../sitecustomize.py'
{envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
# As of twisted 16.4, trial tries to import the tests as a package (previously
@@ -75,8 +76,6 @@ commands =
# )
usedevelop=true
-
-
# A test suite for the oldest supported versions of Python libraries, to catch
# any uses of APIs not available in them.
[testenv:py35-old]
@@ -88,6 +87,7 @@ deps =
mock
lxml
coverage
+ coverage-enable-subprocess
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
@@ -96,15 +96,11 @@ commands =
# OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis).
/bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs -d"\n" pip install'
- # Add this so that coverage will run on subprocesses
- /bin/sh -c 'echo "import coverage; coverage.process_startup()" > {envsitepackagesdir}/../sitecustomize.py'
-
# Install Synapse itself. This won't update any libraries.
pip install -e .
{envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
-
[testenv:packaging]
skip_install=True
deps =
@@ -137,15 +133,15 @@ basepython = python3.6
[testenv:check-sampleconfig]
commands = {toxinidir}/scripts-dev/generate_sample_config --check
-[testenv:codecov]
+[testenv:combine]
skip_install = True
deps =
coverage
- codecov
-commands =
+whitelist_externals =
+ bash
+commands=
coverage combine
- coverage xml
- codecov -X gcov
+ coverage report
[testenv:mypy]
basepython = python3.5
@@ -155,4 +151,4 @@ deps =
extras = all
commands = mypy --ignore-missing-imports \
synapse/logging/_structured.py \
- synapse/logging/_terse_json.py
\ No newline at end of file
+ synapse/logging/_terse_json.py
|