diff --git a/.travis.yml b/.travis.yml
index 2077f6af72..fd41841c77 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,12 +1,27 @@
sudo: false
language: python
-# tell travis to cache ~/.cache/pip
-cache: pip
+cache:
+ directories:
+ # we only bother to cache the wheels; parts of the http cache get
+ # invalidated every build (because they get served with a max-age of 600
+ # seconds), which means that we end up re-uploading the whole cache for
+ # every build, which is time-consuming In any case, it's not obvious that
+ # downloading the cache from S3 would be much faster than downloading the
+ # originals from pypi.
+ #
+ - $HOME/.cache/pip/wheels
-before_script:
- - git remote set-branches --add origin develop
- - git fetch origin develop
+# don't clone the whole repo history, one commit will do
+git:
+ depth: 1
+
+# only build branches we care about (PRs are built seperately)
+branches:
+ only:
+ - master
+ - develop
+ - /^release-v/
matrix:
fast_finish: true
@@ -14,8 +29,8 @@ matrix:
- python: 2.7
env: TOX_ENV=packaging
- - python: 2.7
- env: TOX_ENV=pep8
+ - python: 3.6
+ env: TOX_ENV="pep8,check_isort"
- python: 2.7
env: TOX_ENV=py27
@@ -39,11 +54,14 @@ matrix:
services:
- postgresql
- - python: 3.6
- env: TOX_ENV=check_isort
-
- - python: 3.6
+ - # we only need to check for the newsfragment if it's a PR build
+ if: type = pull_request
+ python: 3.6
env: TOX_ENV=check-newsfragment
+ script:
+ - git remote set-branches --add origin develop
+ - git fetch origin develop
+ - tox -e $TOX_ENV
install:
- pip install tox
diff --git a/CHANGES.md b/CHANGES.md
index 5f598559a0..fb98c934c0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,10 +11,6 @@ If you have email notifications enabled, you should ensure that
have installed customised templates, or leave it unset to use the default
templates.
-The configuration parser will try to detect the situation where
-`email.template_dir` is incorrectly set to `res/templates` and do the right
-thing, but will warn about this.
-
Synapse 0.33.7rc2 (2018-10-17)
==============================
diff --git a/README.rst b/README.rst
index e1ea351f84..456a3d9d43 100644
--- a/README.rst
+++ b/README.rst
@@ -174,6 +174,12 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
Dockerfile to automate a synapse server in a single Docker image, at
https://hub.docker.com/r/avhost/docker-matrix/tags/
+Slavi Pantaleev has created an Ansible playbook,
+which installs the offical Docker image of Matrix Synapse
+along with many other Matrix-related services (Postgres database, riot-web, coturn, mxisd, SSL support, etc.).
+For more details, see
+https://github.com/spantaleev/matrix-docker-ansible-deploy
+
Configuring Synapse
-------------------
diff --git a/UPGRADE.rst b/UPGRADE.rst
index 201d298123..55c77eedde 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -61,10 +61,6 @@ If you have email notifications enabled, you should ensure that
have installed customised templates, or leave it unset to use the default
templates.
-The configuration parser will try to detect the situation where
-``email.template_dir`` is incorrectly set to ``res/templates`` and do the right
-thing, but will warn about this.
-
Upgrading to v0.27.3
====================
diff --git a/changelog.d/3698.misc b/changelog.d/3698.misc
new file mode 100644
index 0000000000..12537e76f2
--- /dev/null
+++ b/changelog.d/3698.misc
@@ -0,0 +1 @@
+Add information about the [matrix-docker-ansible-deploy](https://github.com/spantaleev/matrix-docker-ansible-deploy) playbook
diff --git a/changelog.d/3786.misc b/changelog.d/3786.misc
new file mode 100644
index 0000000000..a9f9a2bb27
--- /dev/null
+++ b/changelog.d/3786.misc
@@ -0,0 +1 @@
+Add initial implementation of new state resolution algorithm
diff --git a/changelog.d/3969.bugfix b/changelog.d/3969.bugfix
new file mode 100644
index 0000000000..ca2759e91e
--- /dev/null
+++ b/changelog.d/3969.bugfix
@@ -0,0 +1 @@
+Fix HTTP error response codes for federated group requests.
diff --git a/changelog.d/3975.feature b/changelog.d/3975.feature
new file mode 100644
index 0000000000..162f30a532
--- /dev/null
+++ b/changelog.d/3975.feature
@@ -0,0 +1 @@
+Servers with auto-join rooms will now automatically create those rooms when the first user registers
diff --git a/changelog.d/4060.bugfix b/changelog.d/4060.bugfix
new file mode 100644
index 0000000000..78d69a8819
--- /dev/null
+++ b/changelog.d/4060.bugfix
@@ -0,0 +1 @@
+Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting.
diff --git a/changelog.d/4061.bugfix b/changelog.d/4061.bugfix
new file mode 100644
index 0000000000..94ffcf7a51
--- /dev/null
+++ b/changelog.d/4061.bugfix
@@ -0,0 +1 @@
+Fix some metrics being racy and causing exceptions when polled by Prometheus.
diff --git a/changelog.d/4067.bugfix b/changelog.d/4067.bugfix
new file mode 100644
index 0000000000..78d69a8819
--- /dev/null
+++ b/changelog.d/4067.bugfix
@@ -0,0 +1 @@
+Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting.
diff --git a/changelog.d/4068.bugfix b/changelog.d/4068.bugfix
new file mode 100644
index 0000000000..74bda7491f
--- /dev/null
+++ b/changelog.d/4068.bugfix
@@ -0,0 +1 @@
+Fix bug which prevented email notifications from being sent unless an absolute path was given for `email_templates`.
\ No newline at end of file
diff --git a/changelog.d/4068.misc b/changelog.d/4068.misc
new file mode 100644
index 0000000000..db6c4ade59
--- /dev/null
+++ b/changelog.d/4068.misc
@@ -0,0 +1 @@
+Make the Python scripts in the top-level scripts folders meet pep8 and pass flake8.
diff --git a/changelog.d/4073.misc b/changelog.d/4073.misc
new file mode 100644
index 0000000000..fc304bef06
--- /dev/null
+++ b/changelog.d/4073.misc
@@ -0,0 +1 @@
+Add psutil as an explicit dependency
diff --git a/changelog.d/4074.bugfix b/changelog.d/4074.bugfix
new file mode 100644
index 0000000000..b3b6b00243
--- /dev/null
+++ b/changelog.d/4074.bugfix
@@ -0,0 +1 @@
+Correctly account for cpu usage by background threads
diff --git a/changelog.d/4075.misc b/changelog.d/4075.misc
new file mode 100644
index 0000000000..d08b8cc271
--- /dev/null
+++ b/changelog.d/4075.misc
@@ -0,0 +1 @@
+Clean up threading and logcontexts in pushers
\ No newline at end of file
diff --git a/changelog.d/4076.misc b/changelog.d/4076.misc
new file mode 100644
index 0000000000..9dd000decf
--- /dev/null
+++ b/changelog.d/4076.misc
@@ -0,0 +1 @@
+Correctly manage logcontexts during startup to fix some "Unexpected logging context" warnings
\ No newline at end of file
diff --git a/changelog.d/4077.misc b/changelog.d/4077.misc
new file mode 100644
index 0000000000..52ca4c1de2
--- /dev/null
+++ b/changelog.d/4077.misc
@@ -0,0 +1 @@
+Give some more things logcontexts
diff --git a/changelog.d/4082.misc b/changelog.d/4082.misc
new file mode 100644
index 0000000000..a81faf5e9b
--- /dev/null
+++ b/changelog.d/4082.misc
@@ -0,0 +1 @@
+Clean up some bits of code which were flagged by the linter
diff --git a/changelog.d/4083.bugfix b/changelog.d/4083.bugfix
new file mode 100644
index 0000000000..b3b08cdfa6
--- /dev/null
+++ b/changelog.d/4083.bugfix
@@ -0,0 +1 @@
+Fix bug which prevented backslashes being used in event field filters
\ No newline at end of file
diff --git a/scripts-dev/check_auth.py b/scripts-dev/check_auth.py
index 4fa8792a5f..b3d11f49ec 100644
--- a/scripts-dev/check_auth.py
+++ b/scripts-dev/check_auth.py
@@ -1,21 +1,20 @@
-from synapse.events import FrozenEvent
-from synapse.api.auth import Auth
-
-from mock import Mock
+from __future__ import print_function
import argparse
import itertools
import json
import sys
+from mock import Mock
+
+from synapse.api.auth import Auth
+from synapse.events import FrozenEvent
+
def check_auth(auth, auth_chain, events):
auth_chain.sort(key=lambda e: e.depth)
- auth_map = {
- e.event_id: e
- for e in auth_chain
- }
+ auth_map = {e.event_id: e for e in auth_chain}
create_events = {}
for e in auth_chain:
@@ -25,31 +24,26 @@ def check_auth(auth, auth_chain, events):
for e in itertools.chain(auth_chain, events):
auth_events_list = [auth_map[i] for i, _ in e.auth_events]
- auth_events = {
- (e.type, e.state_key): e
- for e in auth_events_list
- }
+ auth_events = {(e.type, e.state_key): e for e in auth_events_list}
auth_events[("m.room.create", "")] = create_events[e.room_id]
try:
auth.check(e, auth_events=auth_events)
except Exception as ex:
- print "Failed:", e.event_id, e.type, e.state_key
- print "Auth_events:", auth_events
- print ex
- print json.dumps(e.get_dict(), sort_keys=True, indent=4)
+ print("Failed:", e.event_id, e.type, e.state_key)
+ print("Auth_events:", auth_events)
+ print(ex)
+ print(json.dumps(e.get_dict(), sort_keys=True, indent=4))
# raise
- print "Success:", e.event_id, e.type, e.state_key
+ print("Success:", e.event_id, e.type, e.state_key)
+
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
- 'json',
- nargs='?',
- type=argparse.FileType('r'),
- default=sys.stdin,
+ 'json', nargs='?', type=argparse.FileType('r'), default=sys.stdin
)
args = parser.parse_args()
diff --git a/scripts-dev/check_event_hash.py b/scripts-dev/check_event_hash.py
index 7ccae34d48..8535f99697 100644
--- a/scripts-dev/check_event_hash.py
+++ b/scripts-dev/check_event_hash.py
@@ -1,10 +1,15 @@
-from synapse.crypto.event_signing import *
-from unpaddedbase64 import encode_base64
-
import argparse
import hashlib
-import sys
import json
+import logging
+import sys
+
+from unpaddedbase64 import encode_base64
+
+from synapse.crypto.event_signing import (
+ check_event_content_hash,
+ compute_event_reference_hash,
+)
class dictobj(dict):
@@ -24,27 +29,26 @@ class dictobj(dict):
def main():
parser = argparse.ArgumentParser()
- parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'),
- default=sys.stdin)
+ parser.add_argument(
+ "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin
+ )
args = parser.parse_args()
logging.basicConfig()
event_json = dictobj(json.load(args.input_json))
- algorithms = {
- "sha256": hashlib.sha256,
- }
+ algorithms = {"sha256": hashlib.sha256}
for alg_name in event_json.hashes:
if check_event_content_hash(event_json, algorithms[alg_name]):
- print "PASS content hash %s" % (alg_name,)
+ print("PASS content hash %s" % (alg_name,))
else:
- print "FAIL content hash %s" % (alg_name,)
+ print("FAIL content hash %s" % (alg_name,))
for algorithm in algorithms.values():
name, h_bytes = compute_event_reference_hash(event_json, algorithm)
- print "Reference hash %s: %s" % (name, encode_base64(h_bytes))
+ print("Reference hash %s: %s" % (name, encode_base64(h_bytes)))
-if __name__=="__main__":
- main()
+if __name__ == "__main__":
+ main()
diff --git a/scripts-dev/check_signature.py b/scripts-dev/check_signature.py
index 079577908a..612f17ca7f 100644
--- a/scripts-dev/check_signature.py
+++ b/scripts-dev/check_signature.py
@@ -1,15 +1,15 @@
-from signedjson.sign import verify_signed_json
-from signedjson.key import decode_verify_key_bytes, write_signing_keys
-from unpaddedbase64 import decode_base64
-
-import urllib2
+import argparse
import json
+import logging
import sys
+import urllib2
+
import dns.resolver
-import pprint
-import argparse
-import logging
+from signedjson.key import decode_verify_key_bytes, write_signing_keys
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
+
def get_targets(server_name):
if ":" in server_name:
@@ -23,6 +23,7 @@ def get_targets(server_name):
except dns.resolver.NXDOMAIN:
yield (server_name, 8448)
+
def get_server_keys(server_name, target, port):
url = "https://%s:%i/_matrix/key/v1" % (target, port)
keys = json.load(urllib2.urlopen(url))
@@ -33,12 +34,14 @@ def get_server_keys(server_name, target, port):
verify_keys[key_id] = verify_key
return verify_keys
+
def main():
parser = argparse.ArgumentParser()
parser.add_argument("signature_name")
- parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'),
- default=sys.stdin)
+ parser.add_argument(
+ "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin
+ )
args = parser.parse_args()
logging.basicConfig()
@@ -48,24 +51,23 @@ def main():
for target, port in get_targets(server_name):
try:
keys = get_server_keys(server_name, target, port)
- print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port)
+ print("Using keys from https://%s:%s/_matrix/key/v1" % (target, port))
write_signing_keys(sys.stdout, keys.values())
break
- except:
+ except Exception:
logging.exception("Error talking to %s:%s", target, port)
json_to_check = json.load(args.input_json)
- print "Checking JSON:"
+ print("Checking JSON:")
for key_id in json_to_check["signatures"][args.signature_name]:
try:
key = keys[key_id]
verify_signed_json(json_to_check, args.signature_name, key)
- print "PASS %s" % (key_id,)
- except:
+ print("PASS %s" % (key_id,))
+ except Exception:
logging.exception("Check for key %s failed" % (key_id,))
- print "FAIL %s" % (key_id,)
+ print("FAIL %s" % (key_id,))
if __name__ == '__main__':
main()
-
diff --git a/scripts-dev/convert_server_keys.py b/scripts-dev/convert_server_keys.py
index 151551f22c..dde8596697 100644
--- a/scripts-dev/convert_server_keys.py
+++ b/scripts-dev/convert_server_keys.py
@@ -1,13 +1,21 @@
-import psycopg2
-import yaml
-import sys
+import hashlib
import json
+import sys
import time
-import hashlib
-from unpaddedbase64 import encode_base64
+
+import six
+
+import psycopg2
+import yaml
+from canonicaljson import encode_canonical_json
from signedjson.key import read_signing_keys
from signedjson.sign import sign_json
-from canonicaljson import encode_canonical_json
+from unpaddedbase64 import encode_base64
+
+if six.PY2:
+ db_type = six.moves.builtins.buffer
+else:
+ db_type = memoryview
def select_v1_keys(connection):
@@ -39,7 +47,9 @@ def select_v2_json(connection):
cursor.close()
results = {}
for server_name, key_id, key_json in rows:
- results.setdefault(server_name, {})[key_id] = json.loads(str(key_json).decode("utf-8"))
+ results.setdefault(server_name, {})[key_id] = json.loads(
+ str(key_json).decode("utf-8")
+ )
return results
@@ -47,10 +57,7 @@ def convert_v1_to_v2(server_name, valid_until, keys, certificate):
return {
"old_verify_keys": {},
"server_name": server_name,
- "verify_keys": {
- key_id: {"key": key}
- for key_id, key in keys.items()
- },
+ "verify_keys": {key_id: {"key": key} for key_id, key in keys.items()},
"valid_until_ts": valid_until,
"tls_fingerprints": [fingerprint(certificate)],
}
@@ -65,7 +72,7 @@ def rows_v2(server, json):
valid_until = json["valid_until_ts"]
key_json = encode_canonical_json(json)
for key_id in json["verify_keys"]:
- yield (server, key_id, "-", valid_until, valid_until, buffer(key_json))
+ yield (server, key_id, "-", valid_until, valid_until, db_type(key_json))
def main():
@@ -87,7 +94,7 @@ def main():
result = {}
for server in keys:
- if not server in json:
+ if server not in json:
v2_json = convert_v1_to_v2(
server, valid_until, keys[server], certificates[server]
)
@@ -96,10 +103,7 @@ def main():
yaml.safe_dump(result, sys.stdout, default_flow_style=False)
- rows = list(
- row for server, json in result.items()
- for row in rows_v2(server, json)
- )
+ rows = list(row for server, json in result.items() for row in rows_v2(server, json))
cursor = connection.cursor()
cursor.executemany(
@@ -107,7 +111,7 @@ def main():
" server_name, key_id, from_server,"
" ts_added_ms, ts_valid_until_ms, key_json"
") VALUES (%s, %s, %s, %s, %s, %s)",
- rows
+ rows,
)
connection.commit()
diff --git a/scripts-dev/definitions.py b/scripts-dev/definitions.py
index 47dac7772d..1deb0fe2b7 100755
--- a/scripts-dev/definitions.py
+++ b/scripts-dev/definitions.py
@@ -1,8 +1,16 @@
#! /usr/bin/python
+from __future__ import print_function
+
+import argparse
import ast
+import os
+import re
+import sys
+
import yaml
+
class DefinitionVisitor(ast.NodeVisitor):
def __init__(self):
super(DefinitionVisitor, self).__init__()
@@ -42,15 +50,18 @@ def non_empty(defs):
functions = {name: non_empty(f) for name, f in defs['def'].items()}
classes = {name: non_empty(f) for name, f in defs['class'].items()}
result = {}
- if functions: result['def'] = functions
- if classes: result['class'] = classes
+ if functions:
+ result['def'] = functions
+ if classes:
+ result['class'] = classes
names = defs['names']
uses = []
for name in names.get('Load', ()):
if name not in names.get('Param', ()) and name not in names.get('Store', ()):
uses.append(name)
uses.extend(defs['attrs'])
- if uses: result['uses'] = uses
+ if uses:
+ result['uses'] = uses
result['names'] = names
result['attrs'] = defs['attrs']
return result
@@ -95,7 +106,6 @@ def used_names(prefix, item, defs, names):
if __name__ == '__main__':
- import sys, os, argparse, re
parser = argparse.ArgumentParser(description='Find definitions.')
parser.add_argument(
@@ -105,24 +115,28 @@ if __name__ == '__main__':
"--ignore", action="append", metavar="REGEXP", help="Ignore a pattern"
)
parser.add_argument(
- "--pattern", action="append", metavar="REGEXP",
- help="Search for a pattern"
+ "--pattern", action="append", metavar="REGEXP", help="Search for a pattern"
)
parser.add_argument(
- "directories", nargs='+', metavar="DIR",
- help="Directories to search for definitions"
+ "directories",
+ nargs='+',
+ metavar="DIR",
+ help="Directories to search for definitions",
)
parser.add_argument(
- "--referrers", default=0, type=int,
- help="Include referrers up to the given depth"
+ "--referrers",
+ default=0,
+ type=int,
+ help="Include referrers up to the given depth",
)
parser.add_argument(
- "--referred", default=0, type=int,
- help="Include referred down to the given depth"
+ "--referred",
+ default=0,
+ type=int,
+ help="Include referred down to the given depth",
)
parser.add_argument(
- "--format", default="yaml",
- help="Output format, one of 'yaml' or 'dot'"
+ "--format", default="yaml", help="Output format, one of 'yaml' or 'dot'"
)
args = parser.parse_args()
@@ -162,7 +176,7 @@ if __name__ == '__main__':
for used_by in entry.get("used", ()):
referrers.add(used_by)
for name, definition in names.items():
- if not name in referrers:
+ if name not in referrers:
continue
if ignore and any(pattern.match(name) for pattern in ignore):
continue
@@ -176,7 +190,7 @@ if __name__ == '__main__':
for uses in entry.get("uses", ()):
referred.add(uses)
for name, definition in names.items():
- if not name in referred:
+ if name not in referred:
continue
if ignore and any(pattern.match(name) for pattern in ignore):
continue
@@ -185,12 +199,12 @@ if __name__ == '__main__':
if args.format == 'yaml':
yaml.dump(result, sys.stdout, default_flow_style=False)
elif args.format == 'dot':
- print "digraph {"
+ print("digraph {")
for name, entry in result.items():
- print name
+ print(name)
for used_by in entry.get("used", ()):
if used_by in result:
- print used_by, "->", name
- print "}"
+ print(used_by, "->", name)
+ print("}")
else:
raise ValueError("Unknown format %r" % (args.format))
diff --git a/scripts-dev/dump_macaroon.py b/scripts-dev/dump_macaroon.py
index fcc5568835..22b30fa78e 100755
--- a/scripts-dev/dump_macaroon.py
+++ b/scripts-dev/dump_macaroon.py
@@ -1,8 +1,11 @@
#!/usr/bin/env python2
-import pymacaroons
+from __future__ import print_function
+
import sys
+import pymacaroons
+
if len(sys.argv) == 1:
sys.stderr.write("usage: %s macaroon [key]\n" % (sys.argv[0],))
sys.exit(1)
@@ -11,14 +14,14 @@ macaroon_string = sys.argv[1]
key = sys.argv[2] if len(sys.argv) > 2 else None
macaroon = pymacaroons.Macaroon.deserialize(macaroon_string)
-print macaroon.inspect()
+print(macaroon.inspect())
-print ""
+print("")
verifier = pymacaroons.Verifier()
verifier.satisfy_general(lambda c: True)
try:
verifier.verify(macaroon, key)
- print "Signature is correct"
+ print("Signature is correct")
except Exception as e:
- print str(e)
+ print(str(e))
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index d2acc7654d..2566ce7cef 100755
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -18,21 +18,21 @@
from __future__ import print_function
import argparse
+import base64
+import json
+import sys
from urlparse import urlparse, urlunparse
import nacl.signing
-import json
-import base64
import requests
-import sys
-
-from requests.adapters import HTTPAdapter
import srvlookup
import yaml
+from requests.adapters import HTTPAdapter
# uncomment the following to enable debug logging of http requests
-#from httplib import HTTPConnection
-#HTTPConnection.debuglevel = 1
+# from httplib import HTTPConnection
+# HTTPConnection.debuglevel = 1
+
def encode_base64(input_bytes):
"""Encode bytes as a base64 string without any padding."""
@@ -58,15 +58,15 @@ def decode_base64(input_string):
def encode_canonical_json(value):
return json.dumps(
- value,
- # Encode code-points outside of ASCII as UTF-8 rather than \u escapes
- ensure_ascii=False,
- # Remove unecessary white space.
- separators=(',',':'),
- # Sort the keys of dictionaries.
- sort_keys=True,
- # Encode the resulting unicode as UTF-8 bytes.
- ).encode("UTF-8")
+ value,
+ # Encode code-points outside of ASCII as UTF-8 rather than \u escapes
+ ensure_ascii=False,
+ # Remove unecessary white space.
+ separators=(',', ':'),
+ # Sort the keys of dictionaries.
+ sort_keys=True,
+ # Encode the resulting unicode as UTF-8 bytes.
+ ).encode("UTF-8")
def sign_json(json_object, signing_key, signing_name):
@@ -88,6 +88,7 @@ def sign_json(json_object, signing_key, signing_name):
NACL_ED25519 = "ed25519"
+
def decode_signing_key_base64(algorithm, version, key_base64):
"""Decode a base64 encoded signing key
Args:
@@ -143,14 +144,12 @@ def request_json(method, origin_name, origin_key, destination, path, content):
authorization_headers = []
for key, sig in signed_json["signatures"][origin_name].items():
- header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
- origin_name, key, sig,
- )
+ header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (origin_name, key, sig)
authorization_headers.append(bytes(header))
- print ("Authorization: %s" % header, file=sys.stderr)
+ print("Authorization: %s" % header, file=sys.stderr)
dest = "matrix://%s%s" % (destination, path)
- print ("Requesting %s" % dest, file=sys.stderr)
+ print("Requesting %s" % dest, file=sys.stderr)
s = requests.Session()
s.mount("matrix://", MatrixConnectionAdapter())
@@ -158,10 +157,7 @@ def request_json(method, origin_name, origin_key, destination, path, content):
result = s.request(
method=method,
url=dest,
- headers={
- "Host": destination,
- "Authorization": authorization_headers[0]
- },
+ headers={"Host": destination, "Authorization": authorization_headers[0]},
verify=False,
data=content,
)
@@ -171,50 +167,50 @@ def request_json(method, origin_name, origin_key, destination, path, content):
def main():
parser = argparse.ArgumentParser(
- description=
- "Signs and sends a federation request to a matrix homeserver",
+ description="Signs and sends a federation request to a matrix homeserver"
)
parser.add_argument(
- "-N", "--server-name",
+ "-N",
+ "--server-name",
help="Name to give as the local homeserver. If unspecified, will be "
- "read from the config file.",
+ "read from the config file.",
)
parser.add_argument(
- "-k", "--signing-key-path",
+ "-k",
+ "--signing-key-path",
help="Path to the file containing the private ed25519 key to sign the "
- "request with.",
+ "request with.",
)
parser.add_argument(
- "-c", "--config",
+ "-c",
+ "--config",
default="homeserver.yaml",
help="Path to server config file. Ignored if --server-name and "
- "--signing-key-path are both given.",
+ "--signing-key-path are both given.",
)
parser.add_argument(
- "-d", "--destination",
+ "-d",
+ "--destination",
default="matrix.org",
help="name of the remote homeserver. We will do SRV lookups and "
- "connect appropriately.",
+ "connect appropriately.",
)
parser.add_argument(
- "-X", "--method",
+ "-X",
+ "--method",
help="HTTP method to use for the request. Defaults to GET if --data is"
- "unspecified, POST if it is."
+ "unspecified, POST if it is.",
)
- parser.add_argument(
- "--body",
- help="Data to send as the body of the HTTP request"
- )
+ parser.add_argument("--body", help="Data to send as the body of the HTTP request")
parser.add_argument(
- "path",
- help="request path. We will add '/_matrix/federation/v1/' to this."
+ "path", help="request path. We will add '/_matrix/federation/v1/' to this."
)
args = parser.parse_args()
@@ -227,13 +223,15 @@ def main():
result = request_json(
args.method,
- args.server_name, key, args.destination,
+ args.server_name,
+ key,
+ args.destination,
"/_matrix/federation/v1/" + args.path,
content=args.body,
)
json.dump(result, sys.stdout)
- print ("")
+ print("")
def read_args_from_config(args):
@@ -253,7 +251,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
return s, 8448
if ":" in s:
- out = s.rsplit(":",1)
+ out = s.rsplit(":", 1)
try:
port = int(out[1])
except ValueError:
@@ -263,7 +261,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
try:
srv = srvlookup.lookup("matrix", "tcp", s)[0]
return srv.host, srv.port
- except:
+ except Exception:
return s, 8448
def get_connection(self, url, proxies=None):
@@ -272,10 +270,9 @@ class MatrixConnectionAdapter(HTTPAdapter):
(host, port) = self.lookup(parsed.netloc)
netloc = "%s:%d" % (host, port)
print("Connecting to %s" % (netloc,), file=sys.stderr)
- url = urlunparse((
- "https", netloc, parsed.path, parsed.params, parsed.query,
- parsed.fragment,
- ))
+ url = urlunparse(
+ ("https", netloc, parsed.path, parsed.params, parsed.query, parsed.fragment)
+ )
return super(MatrixConnectionAdapter, self).get_connection(url, proxies)
diff --git a/scripts-dev/hash_history.py b/scripts-dev/hash_history.py
index 616d6a10e7..514d80fa60 100644
--- a/scripts-dev/hash_history.py
+++ b/scripts-dev/hash_history.py
@@ -1,23 +1,31 @@
-from synapse.storage.pdu import PduStore
-from synapse.storage.signatures import SignatureStore
-from synapse.storage._base import SQLBaseStore
-from synapse.federation.units import Pdu
-from synapse.crypto.event_signing import (
- add_event_pdu_content_hash, compute_pdu_event_reference_hash
-)
-from synapse.api.events.utils import prune_pdu
-from unpaddedbase64 import encode_base64, decode_base64
-from canonicaljson import encode_canonical_json
+from __future__ import print_function
+
import sqlite3
import sys
+from unpaddedbase64 import decode_base64, encode_base64
+
+from synapse.crypto.event_signing import (
+ add_event_pdu_content_hash,
+ compute_pdu_event_reference_hash,
+)
+from synapse.federation.units import Pdu
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.pdu import PduStore
+from synapse.storage.signatures import SignatureStore
+
+
class Store(object):
_get_pdu_tuples = PduStore.__dict__["_get_pdu_tuples"]
_get_pdu_content_hashes_txn = SignatureStore.__dict__["_get_pdu_content_hashes_txn"]
_get_prev_pdu_hashes_txn = SignatureStore.__dict__["_get_prev_pdu_hashes_txn"]
- _get_pdu_origin_signatures_txn = SignatureStore.__dict__["_get_pdu_origin_signatures_txn"]
+ _get_pdu_origin_signatures_txn = SignatureStore.__dict__[
+ "_get_pdu_origin_signatures_txn"
+ ]
_store_pdu_content_hash_txn = SignatureStore.__dict__["_store_pdu_content_hash_txn"]
- _store_pdu_reference_hash_txn = SignatureStore.__dict__["_store_pdu_reference_hash_txn"]
+ _store_pdu_reference_hash_txn = SignatureStore.__dict__[
+ "_store_pdu_reference_hash_txn"
+ ]
_store_prev_pdu_hash_txn = SignatureStore.__dict__["_store_prev_pdu_hash_txn"]
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
@@ -26,9 +34,7 @@ store = Store()
def select_pdus(cursor):
- cursor.execute(
- "SELECT pdu_id, origin FROM pdus ORDER BY depth ASC"
- )
+ cursor.execute("SELECT pdu_id, origin FROM pdus ORDER BY depth ASC")
ids = cursor.fetchall()
@@ -41,23 +47,30 @@ def select_pdus(cursor):
for pdu in pdus:
try:
if pdu.prev_pdus:
- print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
+ print("PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus)
for pdu_id, origin, hashes in pdu.prev_pdus:
ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)]
hashes[ref_alg] = encode_base64(ref_hsh)
- store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh)
- print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
+ store._store_prev_pdu_hash_txn(
+ cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh
+ )
+ print("SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus)
pdu = add_event_pdu_content_hash(pdu)
ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu)
reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh)
- store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh)
+ store._store_pdu_reference_hash_txn(
+ cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh
+ )
for alg, hsh_base64 in pdu.hashes.items():
- print alg, hsh_base64
- store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64))
+ print(alg, hsh_base64)
+ store._store_pdu_content_hash_txn(
+ cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64)
+ )
+
+ except Exception:
+ print("FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus)
- except:
- print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus
def main():
conn = sqlite3.connect(sys.argv[1])
@@ -65,5 +78,6 @@ def main():
select_pdus(cursor)
conn.commit()
-if __name__=='__main__':
+
+if __name__ == '__main__':
main()
diff --git a/scripts-dev/list_url_patterns.py b/scripts-dev/list_url_patterns.py
index 58d40c4ff4..da027be26e 100755
--- a/scripts-dev/list_url_patterns.py
+++ b/scripts-dev/list_url_patterns.py
@@ -1,18 +1,17 @@
#! /usr/bin/python
-import ast
import argparse
+import ast
import os
import sys
+
import yaml
PATTERNS_V1 = []
PATTERNS_V2 = []
-RESULT = {
- "v1": PATTERNS_V1,
- "v2": PATTERNS_V2,
-}
+RESULT = {"v1": PATTERNS_V1, "v2": PATTERNS_V2}
+
class CallVisitor(ast.NodeVisitor):
def visit_Call(self, node):
@@ -21,7 +20,6 @@ class CallVisitor(ast.NodeVisitor):
else:
return
-
if name == "client_path_patterns":
PATTERNS_V1.append(node.args[0].s)
elif name == "client_v2_patterns":
@@ -42,8 +40,10 @@ def find_patterns_in_file(filepath):
parser = argparse.ArgumentParser(description='Find url patterns.')
parser.add_argument(
- "directories", nargs='+', metavar="DIR",
- help="Directories to search for definitions"
+ "directories",
+ nargs='+',
+ metavar="DIR",
+ help="Directories to search for definitions",
)
args = parser.parse_args()
diff --git a/scripts-dev/tail-synapse.py b/scripts-dev/tail-synapse.py
index 18be711e92..7c9985d9f0 100644
--- a/scripts-dev/tail-synapse.py
+++ b/scripts-dev/tail-synapse.py
@@ -1,8 +1,9 @@
-import requests
import collections
+import json
import sys
import time
-import json
+
+import requests
Entry = collections.namedtuple("Entry", "name position rows")
@@ -30,11 +31,11 @@ def parse_response(content):
def replicate(server, streams):
- return parse_response(requests.get(
- server + "/_synapse/replication",
- verify=False,
- params=streams
- ).content)
+ return parse_response(
+ requests.get(
+ server + "/_synapse/replication", verify=False, params=streams
+ ).content
+ )
def main():
@@ -45,16 +46,16 @@ def main():
try:
streams = {
row.name: row.position
- for row in replicate(server, {"streams":"-1"})["streams"].rows
+ for row in replicate(server, {"streams": "-1"})["streams"].rows
}
- except requests.exceptions.ConnectionError as e:
+ except requests.exceptions.ConnectionError:
time.sleep(0.1)
while True:
try:
results = replicate(server, streams)
- except:
- sys.stdout.write("connection_lost("+ repr(streams) + ")\n")
+ except Exception:
+ sys.stdout.write("connection_lost(" + repr(streams) + ")\n")
break
for update in results.values():
for row in update.rows:
@@ -62,6 +63,5 @@ def main():
streams[update.name] = update.position
-
-if __name__=='__main__':
+if __name__ == '__main__':
main()
diff --git a/scripts/hash_password b/scripts/hash_password
index 215ab25cfe..a62bb5aa83 100755
--- a/scripts/hash_password
+++ b/scripts/hash_password
@@ -1,12 +1,10 @@
#!/usr/bin/env python
import argparse
-
+import getpass
import sys
import bcrypt
-import getpass
-
import yaml
bcrypt_rounds=12
@@ -52,4 +50,3 @@ if __name__ == "__main__":
password = prompt_for_pass()
print bcrypt.hashpw(password + password_pepper, bcrypt.gensalt(bcrypt_rounds))
-
diff --git a/scripts/move_remote_media_to_new_store.py b/scripts/move_remote_media_to_new_store.py
index 7914ead889..e630936f78 100755
--- a/scripts/move_remote_media_to_new_store.py
+++ b/scripts/move_remote_media_to_new_store.py
@@ -36,12 +36,9 @@ from __future__ import print_function
import argparse
import logging
-
-import sys
-
import os
-
import shutil
+import sys
from synapse.rest.media.v1.filepath import MediaFilePaths
@@ -77,24 +74,23 @@ def move_media(origin_server, file_id, src_paths, dest_paths):
if not os.path.exists(original_file):
logger.warn(
"Original for %s/%s (%s) does not exist",
- origin_server, file_id, original_file,
+ origin_server,
+ file_id,
+ original_file,
)
else:
mkdir_and_move(
- original_file,
- dest_paths.remote_media_filepath(origin_server, file_id),
+ original_file, dest_paths.remote_media_filepath(origin_server, file_id)
)
# now look for thumbnails
- original_thumb_dir = src_paths.remote_media_thumbnail_dir(
- origin_server, file_id,
- )
+ original_thumb_dir = src_paths.remote_media_thumbnail_dir(origin_server, file_id)
if not os.path.exists(original_thumb_dir):
return
mkdir_and_move(
original_thumb_dir,
- dest_paths.remote_media_thumbnail_dir(origin_server, file_id)
+ dest_paths.remote_media_thumbnail_dir(origin_server, file_id),
)
@@ -109,24 +105,16 @@ def mkdir_and_move(original_file, dest_file):
if __name__ == "__main__":
parser = argparse.ArgumentParser(
- description=__doc__,
- formatter_class = argparse.RawDescriptionHelpFormatter,
- )
- parser.add_argument(
- "-v", action='store_true', help='enable debug logging')
- parser.add_argument(
- "src_repo",
- help="Path to source content repo",
- )
- parser.add_argument(
- "dest_repo",
- help="Path to source content repo",
+ description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
+ parser.add_argument("-v", action='store_true', help='enable debug logging')
+ parser.add_argument("src_repo", help="Path to source content repo")
+ parser.add_argument("dest_repo", help="Path to source content repo")
args = parser.parse_args()
logging_config = {
"level": logging.DEBUG if args.v else logging.INFO,
- "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
+ "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
}
logging.basicConfig(**logging_config)
diff --git a/scripts/register_new_matrix_user b/scripts/register_new_matrix_user
index 91bdb3a25b..89143c5d59 100755
--- a/scripts/register_new_matrix_user
+++ b/scripts/register_new_matrix_user
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
import argparse
import getpass
@@ -22,19 +23,23 @@ import hmac
import json
import sys
import urllib2
+
+from six import input
+
import yaml
def request_registration(user, password, server_location, shared_secret, admin=False):
req = urllib2.Request(
"%s/_matrix/client/r0/admin/register" % (server_location,),
- headers={'Content-Type': 'application/json'}
+ headers={'Content-Type': 'application/json'},
)
try:
if sys.version_info[:3] >= (2, 7, 9):
# As of version 2.7.9, urllib2 now checks SSL certs
import ssl
+
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
else:
f = urllib2.urlopen(req)
@@ -42,18 +47,15 @@ def request_registration(user, password, server_location, shared_secret, admin=F
f.close()
nonce = json.loads(body)["nonce"]
except urllib2.HTTPError as e:
- print "ERROR! Received %d %s" % (e.code, e.reason,)
+ print("ERROR! Received %d %s" % (e.code, e.reason))
if 400 <= e.code < 500:
if e.info().type == "application/json":
resp = json.load(e)
if "error" in resp:
- print resp["error"]
+ print(resp["error"])
sys.exit(1)
- mac = hmac.new(
- key=shared_secret,
- digestmod=hashlib.sha1,
- )
+ mac = hmac.new(key=shared_secret, digestmod=hashlib.sha1)
mac.update(nonce)
mac.update("\x00")
@@ -75,30 +77,31 @@ def request_registration(user, password, server_location, shared_secret, admin=F
server_location = server_location.rstrip("/")
- print "Sending registration request..."
+ print("Sending registration request...")
req = urllib2.Request(
"%s/_matrix/client/r0/admin/register" % (server_location,),
data=json.dumps(data),
- headers={'Content-Type': 'application/json'}
+ headers={'Content-Type': 'application/json'},
)
try:
if sys.version_info[:3] >= (2, 7, 9):
# As of version 2.7.9, urllib2 now checks SSL certs
import ssl
+
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
else:
f = urllib2.urlopen(req)
f.read()
f.close()
- print "Success."
+ print("Success.")
except urllib2.HTTPError as e:
- print "ERROR! Received %d %s" % (e.code, e.reason,)
+ print("ERROR! Received %d %s" % (e.code, e.reason))
if 400 <= e.code < 500:
if e.info().type == "application/json":
resp = json.load(e)
if "error" in resp:
- print resp["error"]
+ print(resp["error"])
sys.exit(1)
@@ -106,35 +109,35 @@ def register_new_user(user, password, server_location, shared_secret, admin):
if not user:
try:
default_user = getpass.getuser()
- except:
+ except Exception:
default_user = None
if default_user:
- user = raw_input("New user localpart [%s]: " % (default_user,))
+ user = input("New user localpart [%s]: " % (default_user,))
if not user:
user = default_user
else:
- user = raw_input("New user localpart: ")
+ user = input("New user localpart: ")
if not user:
- print "Invalid user name"
+ print("Invalid user name")
sys.exit(1)
if not password:
password = getpass.getpass("Password: ")
if not password:
- print "Password cannot be blank."
+ print("Password cannot be blank.")
sys.exit(1)
confirm_password = getpass.getpass("Confirm password: ")
if password != confirm_password:
- print "Passwords do not match"
+ print("Passwords do not match")
sys.exit(1)
if admin is None:
- admin = raw_input("Make admin [no]: ")
+ admin = input("Make admin [no]: ")
if admin in ("y", "yes", "true"):
admin = True
else:
@@ -146,42 +149,51 @@ def register_new_user(user, password, server_location, shared_secret, admin):
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Used to register new users with a given home server when"
- " registration has been disabled. The home server must be"
- " configured with the 'registration_shared_secret' option"
- " set.",
+ " registration has been disabled. The home server must be"
+ " configured with the 'registration_shared_secret' option"
+ " set."
)
parser.add_argument(
- "-u", "--user",
+ "-u",
+ "--user",
default=None,
help="Local part of the new user. Will prompt if omitted.",
)
parser.add_argument(
- "-p", "--password",
+ "-p",
+ "--password",
default=None,
help="New password for user. Will prompt if omitted.",
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
- "-a", "--admin",
+ "-a",
+ "--admin",
action="store_true",
- help="Register new user as an admin. Will prompt if --no-admin is not set either.",
+ help=(
+ "Register new user as an admin. "
+ "Will prompt if --no-admin is not set either."
+ ),
)
admin_group.add_argument(
"--no-admin",
action="store_true",
- help="Register new user as a regular user. Will prompt if --admin is not set either.",
+ help=(
+ "Register new user as a regular user. "
+ "Will prompt if --admin is not set either."
+ ),
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
- "-c", "--config",
+ "-c",
+ "--config",
type=argparse.FileType('r'),
help="Path to server config file. Used to read in shared secret.",
)
group.add_argument(
- "-k", "--shared-secret",
- help="Shared secret as defined in server config file.",
+ "-k", "--shared-secret", help="Shared secret as defined in server config file."
)
parser.add_argument(
@@ -189,7 +201,7 @@ if __name__ == "__main__":
default="https://localhost:8448",
nargs='?',
help="URL to use to talk to the home server. Defaults to "
- " 'https://localhost:8448'.",
+ " 'https://localhost:8448'.",
)
args = parser.parse_args()
@@ -198,7 +210,7 @@ if __name__ == "__main__":
config = yaml.safe_load(args.config)
secret = config.get("registration_shared_secret", None)
if not secret:
- print "No 'registration_shared_secret' defined in config."
+ print("No 'registration_shared_secret' defined in config.")
sys.exit(1)
else:
secret = args.shared_secret
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index b9b828c154..3c7b606323 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -15,23 +15,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
-from twisted.enterprise import adbapi
-
-from synapse.storage._base import LoggingTransaction, SQLBaseStore
-from synapse.storage.engines import create_engine
-from synapse.storage.prepare_database import prepare_database
-
import argparse
import curses
import logging
import sys
import time
import traceback
-import yaml
from six import string_types
+import yaml
+
+from twisted.enterprise import adbapi
+from twisted.internet import defer, reactor
+
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.storage.engines import create_engine
+from synapse.storage.prepare_database import prepare_database
logger = logging.getLogger("synapse_port_db")
@@ -105,6 +105,7 @@ class Store(object):
*All* database interactions should go through this object.
"""
+
def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.database_engine = engine
@@ -135,7 +136,8 @@ class Store(object):
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.database_engine, [], []),
- *args, **kwargs
+ *args,
+ **kwargs
)
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
@@ -158,22 +160,20 @@ class Store(object):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()
+
return self.runInteraction("execute_sql", r)
def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in headers),
- ", ".join("%s" for _ in headers)
+ ", ".join("%s" for _ in headers),
)
try:
txn.executemany(sql, rows)
- except:
- logger.exception(
- "Failed to insert: %s",
- table,
- )
+ except Exception:
+ logger.exception("Failed to insert: %s", table)
raise
@@ -206,7 +206,7 @@ class Porter(object):
"table_name": table,
"forward_rowid": 1,
"backward_rowid": 0,
- }
+ },
)
forward_chunk = 1
@@ -221,10 +221,10 @@ class Porter(object):
table, forward_chunk, backward_chunk
)
else:
+
def delete_all(txn):
txn.execute(
- "DELETE FROM port_from_sqlite3 WHERE table_name = %s",
- (table,)
+ "DELETE FROM port_from_sqlite3 WHERE table_name = %s", (table,)
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
@@ -232,11 +232,7 @@ class Porter(object):
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
- values={
- "table_name": table,
- "forward_rowid": 1,
- "backward_rowid": 0,
- }
+ values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
forward_chunk = 1
@@ -251,12 +247,16 @@ class Porter(object):
)
@defer.inlineCallbacks
- def handle_table(self, table, postgres_size, table_size, forward_chunk,
- backward_chunk):
+ def handle_table(
+ self, table, postgres_size, table_size, forward_chunk, backward_chunk
+ ):
logger.info(
"Table %s: %i/%i (rows %i-%i) already ported",
- table, postgres_size, table_size,
- backward_chunk+1, forward_chunk-1,
+ table,
+ postgres_size,
+ table_size,
+ backward_chunk + 1,
+ forward_chunk - 1,
)
if not table_size:
@@ -271,7 +271,9 @@ class Porter(object):
return
if table in (
- "user_directory", "user_directory_search", "users_who_share_rooms",
+ "user_directory",
+ "user_directory_search",
+ "users_who_share_rooms",
"users_in_pubic_room",
):
# We don't port these tables, as they're a faff and we can regenreate
@@ -283,37 +285,35 @@ class Porter(object):
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
yield self.postgres_store._simple_insert(
- table=table,
- values={"stream_id": None},
+ table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
return
forward_select = (
- "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
- % (table,)
+ "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,)
)
backward_select = (
- "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?"
- % (table,)
+ "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" % (table,)
)
do_forward = [True]
do_backward = [True]
while True:
+
def r(txn):
forward_rows = []
backward_rows = []
if do_forward[0]:
- txn.execute(forward_select, (forward_chunk, self.batch_size,))
+ txn.execute(forward_select, (forward_chunk, self.batch_size))
forward_rows = txn.fetchall()
if not forward_rows:
do_forward[0] = False
if do_backward[0]:
- txn.execute(backward_select, (backward_chunk, self.batch_size,))
+ txn.execute(backward_select, (backward_chunk, self.batch_size))
backward_rows = txn.fetchall()
if not backward_rows:
do_backward[0] = False
@@ -325,9 +325,7 @@ class Porter(object):
return headers, forward_rows, backward_rows
- headers, frows, brows = yield self.sqlite_store.runInteraction(
- "select", r
- )
+ headers, frows, brows = yield self.sqlite_store.runInteraction("select", r)
if frows or brows:
if frows:
@@ -339,9 +337,7 @@ class Porter(object):
rows = self._convert_rows(table, headers, rows)
def insert(txn):
- self.postgres_store.insert_many_txn(
- txn, table, headers[1:], rows
- )
+ self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
self.postgres_store._simple_update_one_txn(
txn,
@@ -362,8 +358,9 @@ class Porter(object):
return
@defer.inlineCallbacks
- def handle_search_table(self, postgres_size, table_size, forward_chunk,
- backward_chunk):
+ def handle_search_table(
+ self, postgres_size, table_size, forward_chunk, backward_chunk
+ ):
select = (
"SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
" FROM event_search as es"
@@ -373,8 +370,9 @@ class Porter(object):
)
while True:
+
def r(txn):
- txn.execute(select, (forward_chunk, self.batch_size,))
+ txn.execute(select, (forward_chunk, self.batch_size))
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
@@ -402,18 +400,21 @@ class Porter(object):
else:
rows_dict.append(d)
- txn.executemany(sql, [
- (
- row["event_id"],
- row["room_id"],
- row["key"],
- row["sender"],
- row["value"],
- row["origin_server_ts"],
- row["stream_ordering"],
- )
- for row in rows_dict
- ])
+ txn.executemany(
+ sql,
+ [
+ (
+ row["event_id"],
+ row["room_id"],
+ row["key"],
+ row["sender"],
+ row["value"],
+ row["origin_server_ts"],
+ row["stream_ordering"],
+ )
+ for row in rows_dict
+ ],
+ )
self.postgres_store._simple_update_one_txn(
txn,
@@ -437,7 +438,8 @@ class Porter(object):
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
- k: v for k, v in db_config.get("args", {}).items()
+ k: v
+ for k, v in db_config.get("args", {}).items()
if not k.startswith("cp_")
}
)
@@ -450,13 +452,11 @@ class Porter(object):
def run(self):
try:
sqlite_db_pool = adbapi.ConnectionPool(
- self.sqlite_config["name"],
- **self.sqlite_config["args"]
+ self.sqlite_config["name"], **self.sqlite_config["args"]
)
postgres_db_pool = adbapi.ConnectionPool(
- self.postgres_config["name"],
- **self.postgres_config["args"]
+ self.postgres_config["name"], **self.postgres_config["args"]
)
sqlite_engine = create_engine(sqlite_config)
@@ -465,9 +465,7 @@ class Porter(object):
self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
- yield self.postgres_store.execute(
- postgres_engine.check_database
- )
+ yield self.postgres_store.execute(postgres_engine.check_database)
# Step 1. Set up databases.
self.progress.set_state("Preparing SQLite3")
@@ -477,6 +475,7 @@ class Porter(object):
self.setup_db(postgres_config, postgres_engine)
self.progress.set_state("Creating port tables")
+
def create_port_table(txn):
txn.execute(
"CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
@@ -501,10 +500,9 @@ class Porter(object):
)
try:
- yield self.postgres_store.runInteraction(
- "alter_table", alter_table
- )
- except Exception as e:
+ yield self.postgres_store.runInteraction("alter_table", alter_table)
+ except Exception:
+ # On Error Resume Next
pass
yield self.postgres_store.runInteraction(
@@ -514,11 +512,7 @@ class Porter(object):
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
- table="sqlite_master",
- keyvalues={
- "type": "table",
- },
- retcol="name",
+ table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)
postgres_tables = yield self.postgres_store._simple_select_onecol(
@@ -545,18 +539,14 @@ class Porter(object):
# Step 4. Do the copying.
self.progress.set_state("Copying to postgres")
yield defer.gatherResults(
- [
- self.handle_table(*res)
- for res in setup_res
- ],
- consumeErrors=True,
+ [self.handle_table(*res) for res in setup_res], consumeErrors=True
)
# Step 5. Do final post-processing
yield self._setup_state_group_id_seq()
self.progress.done()
- except:
+ except Exception:
global end_error_exec_info
end_error_exec_info = sys.exc_info()
logger.exception("")
@@ -566,9 +556,7 @@ class Porter(object):
def _convert_rows(self, table, headers, rows):
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
- bool_cols = [
- i for i, h in enumerate(headers) if h in bool_col_names
- ]
+ bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names]
class BadValueException(Exception):
pass
@@ -577,18 +565,21 @@ class Porter(object):
if j in bool_cols:
return bool(col)
elif isinstance(col, string_types) and "\0" in col:
- logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col)
- raise BadValueException();
+ logger.warn(
+ "DROPPING ROW: NUL value in table %s col %s: %r",
+ table,
+ headers[j],
+ col,
+ )
+ raise BadValueException()
return col
outrows = []
for i, row in enumerate(rows):
try:
- outrows.append(tuple(
- conv(j, col)
- for j, col in enumerate(row)
- if j > 0
- ))
+ outrows.append(
+ tuple(conv(j, col) for j, col in enumerate(row) if j > 0)
+ )
except BadValueException:
pass
@@ -616,9 +607,7 @@ class Porter(object):
return headers, [r for r in rows if r[ts_ind] < yesterday]
- headers, rows = yield self.sqlite_store.runInteraction(
- "select", r,
- )
+ headers, rows = yield self.sqlite_store.runInteraction("select", r)
rows = self._convert_rows("sent_transactions", headers, rows)
@@ -639,7 +628,7 @@ class Porter(object):
txn.execute(
"SELECT rowid FROM sent_transactions WHERE ts >= ?"
" ORDER BY rowid ASC LIMIT 1",
- (yesterday,)
+ (yesterday,),
)
rows = txn.fetchall()
@@ -657,21 +646,17 @@ class Porter(object):
"table_name": "sent_transactions",
"forward_rowid": next_chunk,
"backward_rowid": 0,
- }
+ },
)
def get_sent_table_size(txn):
txn.execute(
- "SELECT count(*) FROM sent_transactions"
- " WHERE ts >= ?",
- (yesterday,)
+ "SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
)
size, = txn.fetchone()
return int(size)
- remaining_count = yield self.sqlite_store.execute(
- get_sent_table_size
- )
+ remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
total_count = remaining_count + inserted_rows
@@ -680,13 +665,11 @@ class Porter(object):
@defer.inlineCallbacks
def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
frows = yield self.sqlite_store.execute_sql(
- "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,),
- forward_chunk,
+ "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk
)
brows = yield self.sqlite_store.execute_sql(
- "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,),
- backward_chunk,
+ "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk
)
defer.returnValue(frows[0][0] + brows[0][0])
@@ -694,7 +677,7 @@ class Porter(object):
@defer.inlineCallbacks
def _get_already_ported_count(self, table):
rows = yield self.postgres_store.execute_sql(
- "SELECT count(*) FROM %s" % (table,),
+ "SELECT count(*) FROM %s" % (table,)
)
defer.returnValue(rows[0][0])
@@ -717,22 +700,21 @@ class Porter(object):
def _setup_state_group_id_seq(self):
def r(txn):
txn.execute("SELECT MAX(id) FROM state_groups")
- next_id = txn.fetchone()[0]+1
- txn.execute(
- "ALTER SEQUENCE state_group_id_seq RESTART WITH %s",
- (next_id,),
- )
+ next_id = txn.fetchone()[0] + 1
+ txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
+
return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
##############################################
-###### The following is simply UI stuff ######
+# The following is simply UI stuff
##############################################
class Progress(object):
"""Used to report progress of the port
"""
+
def __init__(self):
self.tables = {}
@@ -758,6 +740,7 @@ class Progress(object):
class CursesProgress(Progress):
"""Reports progress to a curses window
"""
+
def __init__(self, stdscr):
self.stdscr = stdscr
@@ -801,7 +784,7 @@ class CursesProgress(Progress):
duration = int(now) - int(self.start_time)
minutes, seconds = divmod(duration, 60)
- duration_str = '%02dm %02ds' % (minutes, seconds,)
+ duration_str = '%02dm %02ds' % (minutes, seconds)
if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
@@ -814,16 +797,12 @@ class CursesProgress(Progress):
est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
- status = (
- "Time spent: %s (est. remaining: %s)"
- % (duration_str, est_remaining_str,)
+ status = "Time spent: %s (est. remaining: %s)" % (
+ duration_str,
+ est_remaining_str,
)
- self.stdscr.addstr(
- 0, 0,
- status,
- curses.A_BOLD,
- )
+ self.stdscr.addstr(0, 0, status, curses.A_BOLD)
max_len = max([len(t) for t in self.tables.keys()])
@@ -831,9 +810,7 @@ class CursesProgress(Progress):
middle_space = 1
items = self.tables.items()
- items.sort(
- key=lambda i: (i[1]["perc"], i[0]),
- )
+ items.sort(key=lambda i: (i[1]["perc"], i[0]))
for i, (table, data) in enumerate(items):
if i + 2 >= rows:
@@ -844,9 +821,7 @@ class CursesProgress(Progress):
color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
self.stdscr.addstr(
- i + 2, left_margin + max_len - len(table),
- table,
- curses.A_BOLD | color,
+ i + 2, left_margin + max_len - len(table), table, curses.A_BOLD | color
)
size = 20
@@ -857,15 +832,13 @@ class CursesProgress(Progress):
)
self.stdscr.addstr(
- i + 2, left_margin + max_len + middle_space,
+ i + 2,
+ left_margin + max_len + middle_space,
"%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
)
if self.finished:
- self.stdscr.addstr(
- rows - 1, 0,
- "Press any key to exit...",
- )
+ self.stdscr.addstr(rows - 1, 0, "Press any key to exit...")
self.stdscr.refresh()
self.last_update = time.time()
@@ -877,29 +850,25 @@ class CursesProgress(Progress):
def set_state(self, state):
self.stdscr.clear()
- self.stdscr.addstr(
- 0, 0,
- state + "...",
- curses.A_BOLD,
- )
+ self.stdscr.addstr(0, 0, state + "...", curses.A_BOLD)
self.stdscr.refresh()
class TerminalProgress(Progress):
"""Just prints progress to the terminal
"""
+
def update(self, table, num_done):
super(TerminalProgress, self).update(table, num_done)
data = self.tables[table]
- print "%s: %d%% (%d/%d)" % (
- table, data["perc"],
- data["num_done"], data["total"],
+ print(
+ "%s: %d%% (%d/%d)" % (table, data["perc"], data["num_done"], data["total"])
)
def set_state(self, state):
- print state + "..."
+ print(state + "...")
##############################################
@@ -909,34 +878,38 @@ class TerminalProgress(Progress):
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="A script to port an existing synapse SQLite database to"
- " a new PostgreSQL database."
+ " a new PostgreSQL database."
)
parser.add_argument("-v", action='store_true')
parser.add_argument(
- "--sqlite-database", required=True,
+ "--sqlite-database",
+ required=True,
help="The snapshot of the SQLite database file. This must not be"
- " currently used by a running synapse server"
+ " currently used by a running synapse server",
)
parser.add_argument(
- "--postgres-config", type=argparse.FileType('r'), required=True,
- help="The database config file for the PostgreSQL database"
+ "--postgres-config",
+ type=argparse.FileType('r'),
+ required=True,
+ help="The database config file for the PostgreSQL database",
)
parser.add_argument(
- "--curses", action='store_true',
- help="display a curses based progress UI"
+ "--curses", action='store_true', help="display a curses based progress UI"
)
parser.add_argument(
- "--batch-size", type=int, default=1000,
+ "--batch-size",
+ type=int,
+ default=1000,
help="The number of rows to select from the SQLite table each"
- " iteration [default=1000]",
+ " iteration [default=1000]",
)
args = parser.parse_args()
logging_config = {
"level": logging.DEBUG if args.v else logging.INFO,
- "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
+ "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
}
if args.curses:
diff --git a/setup.cfg b/setup.cfg
index 52feaa9cc7..b6b4aa740d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -14,17 +14,16 @@ ignore =
pylint.cfg
tox.ini
-[pep8]
-max-line-length = 90
-# W503 requires that binary operators be at the end, not start, of lines. Erik
-# doesn't like it. E203 is contrary to PEP8. E731 is silly.
-ignore = W503,E203,E731
-
[flake8]
-# note that flake8 inherits the "ignore" settings from "pep8" (because it uses
-# pep8 to do those checks), but not the "max-line-length" setting
max-line-length = 90
-ignore=W503,E203,E731
+
+# see https://pycodestyle.readthedocs.io/en/latest/intro.html#error-codes
+# for error codes. The ones we ignore are:
+# W503: line break before binary operator
+# W504: line break after binary operator
+# E203: whitespace before ':' (which is contrary to pep8?)
+# E731: do not assign a lambda expression, use a def
+ignore=W503,W504,E203,E731
[isort]
line_length = 89
diff --git a/setup.py b/setup.py
index b00c2af367..00b69c43f5 100755
--- a/setup.py
+++ b/setup.py
@@ -1,6 +1,8 @@
#!/usr/bin/env python
-# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2014-2017 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2017-2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -86,7 +88,7 @@ setup(
name="matrix-synapse",
version=version,
packages=find_packages(exclude=["tests", "tests.*"]),
- description="Reference Synapse Home Server",
+ description="Reference homeserver for the Matrix decentralised comms protocol",
install_requires=dependencies['requirements'](include_conditional=True).keys(),
dependency_links=dependencies["DEPENDENCY_LINKS"].values(),
include_package_data=True,
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index eed8c67e6a..677c0bdd4c 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -172,7 +172,10 @@ USER_FILTER_SCHEMA = {
# events a lot easier as we can then use a negative lookbehind
# assertion to split '\.' If we allowed \\ then it would
# incorrectly split '\\.' See synapse.events.utils.serialize_event
- "pattern": "^((?!\\\).)*$"
+ #
+ # Note that because this is a regular expression, we have to escape
+ # each backslash in the pattern.
+ "pattern": r"^((?!\\\\).)*$"
}
}
},
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e3f0d99a3f..0b85b377e3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ import sys
from six import iteritems
+import psutil
from prometheus_client import Gauge
from twisted.application import service
@@ -502,7 +503,6 @@ def run(hs):
def performance_stats_init():
try:
- import psutil
process = psutil.Process()
# Ensure we can fetch both, and make the initial request for cpu_percent
# so the next request will use this as the initial point.
@@ -510,12 +510,9 @@ def run(hs):
process.cpu_percent(interval=None)
logger.info("report_stats can use psutil")
stats_process.append(process)
- except (ImportError, AttributeError):
- logger.warn(
- "report_stats enabled but psutil is not installed or incorrect version."
- " Disabling reporting of memory/cpu stats."
- " Ensuring psutil is available will help matrix.org track performance"
- " changes across releases."
+ except (AttributeError):
+ logger.warning(
+ "Unable to read memory/cpu stats. Disabling reporting."
)
def generate_user_daily_visit_stats():
@@ -530,10 +527,13 @@ def run(hs):
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
# monthly active user limiting functionality
- clock.looping_call(
- hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
- )
- hs.get_datastore().reap_monthly_active_users()
+ def reap_monthly_active_users():
+ return run_as_background_process(
+ "reap_monthly_active_users",
+ hs.get_datastore().reap_monthly_active_users,
+ )
+ clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
+ reap_monthly_active_users()
@defer.inlineCallbacks
def generate_monthly_active_users():
@@ -547,12 +547,23 @@ def run(hs):
registered_reserved_users_mau_gauge.set(float(reserved_count))
max_mau_gauge.set(float(hs.config.max_mau_value))
- hs.get_datastore().initialise_reserved_users(
- hs.config.mau_limits_reserved_threepids
+ def start_generate_monthly_active_users():
+ return run_as_background_process(
+ "generate_monthly_active_users",
+ generate_monthly_active_users,
+ )
+
+ # XXX is this really supposed to be a background process? it looks
+ # like it needs to complete before some of the other stuff runs.
+ run_as_background_process(
+ "initialise_reserved_users",
+ hs.get_datastore().initialise_reserved_users,
+ hs.config.mau_limits_reserved_threepids,
)
- generate_monthly_active_users()
+
+ start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
- clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+ clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats:
@@ -568,7 +579,7 @@ def run(hs):
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
- print (hs.config.pid_file)
+ print(hs.config.pid_file)
_base.start_reactor(
"synapse-homeserver",
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 0f9f8e19f6..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- self.pusher_pool.on_new_receipts(
+ yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
@@ -183,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def start_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
- return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+ return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
def start(config_options):
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 8fccf573ee..79fe9c3dac 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -28,7 +28,7 @@ if __name__ == "__main__":
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
- print (getattr(config, key))
+ print(getattr(config, key))
sys.exit(0)
else:
sys.stderr.write("Unknown command %r\n" % (action,))
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 3d2e90dd5b..14dae65ea0 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -106,10 +106,7 @@ class Config(object):
@classmethod
def check_file(cls, file_path, config_name):
if file_path is None:
- raise ConfigError(
- "Missing config for %s."
- % (config_name,)
- )
+ raise ConfigError("Missing config for %s." % (config_name,))
try:
os.stat(file_path)
except OSError as e:
@@ -128,9 +125,7 @@ class Config(object):
if e.errno != errno.EEXIST:
raise
if not os.path.isdir(dir_path):
- raise ConfigError(
- "%s is not a directory" % (dir_path,)
- )
+ raise ConfigError("%s is not a directory" % (dir_path,))
return dir_path
@classmethod
@@ -156,21 +151,20 @@ class Config(object):
return results
def generate_config(
- self,
- config_dir_path,
- server_name,
- is_generating_file,
- report_stats=None,
+ self, config_dir_path, server_name, is_generating_file, report_stats=None
):
default_config = "# vim:ft=yaml\n"
- default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all(
- "default_config",
- config_dir_path=config_dir_path,
- server_name=server_name,
- is_generating_file=is_generating_file,
- report_stats=report_stats,
- ))
+ default_config += "\n\n".join(
+ dedent(conf)
+ for conf in self.invoke_all(
+ "default_config",
+ config_dir_path=config_dir_path,
+ server_name=server_name,
+ is_generating_file=is_generating_file,
+ report_stats=report_stats,
+ )
+ )
config = yaml.load(default_config)
@@ -178,23 +172,22 @@ class Config(object):
@classmethod
def load_config(cls, description, argv):
- config_parser = argparse.ArgumentParser(
- description=description,
- )
+ config_parser = argparse.ArgumentParser(description=description)
config_parser.add_argument(
- "-c", "--config-path",
+ "-c",
+ "--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
- " may specify directories containing *.yaml files."
+ " may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Where files such as certs and signing keys are stored when"
- " their location is given explicitly in the config."
- " Defaults to the directory containing the last config file",
+ " their location is given explicitly in the config."
+ " Defaults to the directory containing the last config file",
)
config_args = config_parser.parse_args(argv)
@@ -203,9 +196,7 @@ class Config(object):
obj = cls()
obj.read_config_files(
- config_files,
- keys_directory=config_args.keys_directory,
- generate_keys=False,
+ config_files, keys_directory=config_args.keys_directory, generate_keys=False
)
return obj
@@ -213,38 +204,38 @@ class Config(object):
def load_or_generate_config(cls, description, argv):
config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument(
- "-c", "--config-path",
+ "-c",
+ "--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
- " may specify directories containing *.yaml files."
+ " may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--generate-config",
action="store_true",
- help="Generate a config file for the server name"
+ help="Generate a config file for the server name",
)
config_parser.add_argument(
"--report-stats",
action="store",
help="Whether the generated config reports anonymized usage statistics",
- choices=["yes", "no"]
+ choices=["yes", "no"],
)
config_parser.add_argument(
"--generate-keys",
action="store_true",
- help="Generate any missing key files then exit"
+ help="Generate any missing key files then exit",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Used with 'generate-*' options to specify where files such as"
- " certs and signing keys should be stored in, unless explicitly"
- " specified in the config."
+ " certs and signing keys should be stored in, unless explicitly"
+ " specified in the config.",
)
config_parser.add_argument(
- "-H", "--server-name",
- help="The server name to generate a config file for"
+ "-H", "--server-name", help="The server name to generate a config file for"
)
config_args, remaining_args = config_parser.parse_known_args(argv)
@@ -257,8 +248,8 @@ class Config(object):
if config_args.generate_config:
if config_args.report_stats is None:
config_parser.error(
- "Please specify either --report-stats=yes or --report-stats=no\n\n" +
- MISSING_REPORT_STATS_SPIEL
+ "Please specify either --report-stats=yes or --report-stats=no\n\n"
+ + MISSING_REPORT_STATS_SPIEL
)
if not config_files:
config_parser.error(
@@ -287,26 +278,32 @@ class Config(object):
config_dir_path=config_dir_path,
server_name=server_name,
report_stats=(config_args.report_stats == "yes"),
- is_generating_file=True
+ is_generating_file=True,
)
obj.invoke_all("generate_files", config)
config_file.write(config_str)
- print((
- "A config file has been generated in %r for server name"
- " %r with corresponding SSL keys and self-signed"
- " certificates. Please review this file and customise it"
- " to your needs."
- ) % (config_path, server_name))
+ print(
+ (
+ "A config file has been generated in %r for server name"
+ " %r with corresponding SSL keys and self-signed"
+ " certificates. Please review this file and customise it"
+ " to your needs."
+ )
+ % (config_path, server_name)
+ )
print(
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
- print((
- "Config file %r already exists. Generating any missing key"
- " files."
- ) % (config_path,))
+ print(
+ (
+ "Config file %r already exists. Generating any missing key"
+ " files."
+ )
+ % (config_path,)
+ )
generate_keys = True
parser = argparse.ArgumentParser(
@@ -338,8 +335,7 @@ class Config(object):
return obj
- def read_config_files(self, config_files, keys_directory=None,
- generate_keys=False):
+ def read_config_files(self, config_files, keys_directory=None, generate_keys=False):
if not keys_directory:
keys_directory = os.path.dirname(config_files[-1])
@@ -364,8 +360,9 @@ class Config(object):
if "report_stats" not in config:
raise ConfigError(
- MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
- MISSING_REPORT_STATS_SPIEL
+ MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS
+ + "\n"
+ + MISSING_REPORT_STATS_SPIEL
)
if generate_keys:
@@ -399,16 +396,16 @@ def find_config_files(search_paths):
for entry in os.listdir(config_path):
entry_path = os.path.join(config_path, entry)
if not os.path.isfile(entry_path):
- print (
- "Found subdirectory in config directory: %r. IGNORING."
- ) % (entry_path, )
+ err = "Found subdirectory in config directory: %r. IGNORING."
+ print(err % (entry_path,))
continue
if not entry.endswith(".yaml"):
- print (
- "Found file in config directory that does not"
- " end in '.yaml': %r. IGNORING."
- ) % (entry_path, )
+ err = (
+ "Found file in config directory that does not end in "
+ "'.yaml': %r. IGNORING."
+ )
+ print(err % (entry_path,))
continue
files.append(entry_path)
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index e2582cfecc..93d70cff14 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -19,18 +19,12 @@ from __future__ import print_function
import email.utils
import logging
import os
-import sys
-import textwrap
-from ._base import Config
+import pkg_resources
-logger = logging.getLogger(__name__)
+from ._base import Config, ConfigError
-TEMPLATE_DIR_WARNING = """\
-WARNING: The email notifier is configured to look for templates in '%(template_dir)s',
-but no templates could be found there. We will fall back to using the example templates;
-to get rid of this warning, leave 'email.template_dir' unset.
-"""
+logger = logging.getLogger(__name__)
class EmailConfig(Config):
@@ -78,20 +72,22 @@ class EmailConfig(Config):
self.email_notif_template_html = email_config["notif_template_html"]
self.email_notif_template_text = email_config["notif_template_text"]
- self.email_template_dir = email_config.get("template_dir")
-
- # backwards-compatibility hack
- if (
- self.email_template_dir == "res/templates"
- and not os.path.isfile(
- os.path.join(self.email_template_dir, self.email_notif_template_text)
+ template_dir = email_config.get("template_dir")
+ # we need an absolute path, because we change directory after starting (and
+ # we don't yet know what auxilliary templates like mail.css we will need).
+ # (Note that loading as package_resources with jinja.PackageLoader doesn't
+ # work for the same reason.)
+ if not template_dir:
+ template_dir = pkg_resources.resource_filename(
+ 'synapse', 'res/templates'
)
- ):
- t = TEMPLATE_DIR_WARNING % {
- "template_dir": self.email_template_dir,
- }
- print(textwrap.fill(t, width=80) + "\n", file=sys.stderr)
- self.email_template_dir = None
+ template_dir = os.path.abspath(template_dir)
+
+ for f in self.email_notif_template_text, self.email_notif_template_html:
+ p = os.path.join(template_dir, f)
+ if not os.path.isfile(p):
+ raise ConfigError("Unable to find email template file %s" % (p, ))
+ self.email_template_dir = template_dir
self.email_notif_for_new_users = email_config.get(
"notif_for_new_users", True
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 0fb964eb67..7480ed5145 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -15,10 +15,10 @@
from distutils.util import strtobool
+from synapse.config._base import Config, ConfigError
+from synapse.types import RoomAlias
from synapse.util.stringutils import random_string_with_symbols
-from ._base import Config
-
class RegistrationConfig(Config):
@@ -44,6 +44,10 @@ class RegistrationConfig(Config):
)
self.auto_join_rooms = config.get("auto_join_rooms", [])
+ for room_alias in self.auto_join_rooms:
+ if not RoomAlias.is_valid(room_alias):
+ raise ConfigError('Invalid auto_join_rooms entry %s' % (room_alias,))
+ self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True)
def default_config(self, **kwargs):
registration_shared_secret = random_string_with_symbols(50)
@@ -98,6 +102,13 @@ class RegistrationConfig(Config):
# to these rooms
#auto_join_rooms:
# - "#example:example.com"
+
+ # Where auto_join_rooms are specified, setting this flag ensures that the
+ # the rooms exist by creating them when the first user on the
+ # homeserver registers.
+ # Setting to false means that if the rooms are not manually created,
+ # users cannot be auto-joined since they do not exist.
+ autocreate_auto_join_rooms: true
""" % locals()
def add_arguments(self, parser):
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index fc909c1fac..06c62ab62c 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -178,7 +178,7 @@ class ContentRepositoryConfig(Config):
def default_config(self, **kwargs):
media_store = self.default_path("media_store")
uploads_path = self.default_path("uploads")
- return """
+ return r"""
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index 57d4665e84..080c81f14b 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -55,7 +55,7 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
logger.warn("Error getting key for %r: %s", server_name, e)
- except Exception as e:
+ except Exception:
logger.exception("Error getting key for %r", server_name)
raise IOError("Cannot get key for %r" % server_name)
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index af3eee95b9..d4d4474847 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -690,7 +690,7 @@ def auth_types_for_event(event):
auth_types = []
auth_types.append((EventTypes.PowerLevels, "", ))
- auth_types.append((EventTypes.Member, event.user_id, ))
+ auth_types.append((EventTypes.Member, event.sender, ))
auth_types.append((EventTypes.Create, "", ))
if event.type == EventTypes.Member:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d041c26824..0f9302a6a8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -788,7 +788,7 @@ class FederationHandlerRegistry(object):
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
- except Exception as e:
+ except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
import pymacaroons
from canonicaljson import json
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
+from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
from ._base import BaseHandler
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(),
- self.hs.get_reactor().getThreadPool(),
- _do_validate_hash,
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
None
"""
if not self._user_parter_running:
- run_in_background(self._user_parter_loop)
+ run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cab57a8849..cd5b9bbb19 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -53,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
-from synapse.state import resolve_events_with_factory
+from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
@@ -384,24 +384,24 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
- # Resolve any conflicting state
- @defer.inlineCallbacks
- def fetch(ev_ids):
- fetched = yield self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
- # add any events we fetch here to the `event_map` so that we
- # can use them to build the state event list below.
- event_map.update(fetched)
- defer.returnValue(fetched)
-
room_version = yield self.store.get_room_version(room_id)
- state_map = yield resolve_events_with_factory(
- room_version, state_maps, event_map, fetch,
+ state_map = yield resolve_events_with_store(
+ room_version, state_maps, event_map,
+ state_res_store=StateResolutionStore(self.store),
)
- # we need to give _process_received_pdu the actual state events
+ # We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
+
+ # First though we need to fetch all the events that are in
+ # state_map, so we can build up the state below.
+ evs = yield self.store.get_events(
+ list(state_map.values()),
+ get_prev_content=False,
+ check_redacted=False,
+ )
+ event_map.update(evs)
+
state = [
event_map[e] for e in six.itervalues(state_map)
]
@@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
- self.pusher_pool.on_new_notifications(
+ return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
)
else:
destination = get_domain_from_id(group_id)
- return getattr(self.transport_client, func_name)(
+ d = getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
+
+ # Capture errors returned by the remote homeserver and
+ # re-throw specific errors as SynapseErrors. This is so
+ # when the remote end responds with things like 403 Not
+ # In Group, we can communicate that to the client instead
+ # of a 500.
+ def h(failure):
+ failure.trap(HttpResponseException)
+ e = failure.value
+ if e.code == 403:
+ raise e.to_synapse_error()
+ return failure
+ d.addErrback(h)
+ return d
return f
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4954b23a0d..6c4fcfb10a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -779,7 +779,7 @@ class EventCreationHandler(object):
event, context=context
)
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6f3181f09..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
+ yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index da914c46ff..e9d7b25a36 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -50,6 +50,7 @@ class RegistrationHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
+ self.room_creation_handler = self.hs.get_room_creation_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self._next_generated_user_id = None
@@ -220,9 +221,36 @@ class RegistrationHandler(BaseHandler):
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
+
+ # try to create the room if we're the first user on the server
+ should_auto_create_rooms = False
+ if self.hs.config.autocreate_auto_join_rooms:
+ count = yield self.store.count_all_users()
+ should_auto_create_rooms = count == 1
+
for r in self.hs.config.auto_join_rooms:
try:
- yield self._join_user_to_room(fake_requester, r)
+ if should_auto_create_rooms:
+ room_alias = RoomAlias.from_string(r)
+ if self.hs.hostname != room_alias.domain:
+ logger.warning(
+ 'Cannot create room alias %s, '
+ 'it does not match server domain',
+ r,
+ )
+ else:
+ # create room expects the localpart of the room alias
+ room_alias_localpart = room_alias.localpart
+ yield self.room_creation_handler.create_room(
+ fake_requester,
+ config={
+ "preset": "public_chat",
+ "room_alias_name": room_alias_localpart
+ },
+ ratelimit=False,
+ )
+ else:
+ yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
"""
return self.store.search_user_dir(user_id, search_term, limit)
- @defer.inlineCallbacks
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
if self._is_processing:
return
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
self._is_processing = True
- try:
- yield self._unsafe_process()
- finally:
- self._is_processing = False
+ run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index fcc02fc77d..24b6110c20 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object):
Returns:
Deferred: resolves with the http response object on success.
- Fails with ``HTTPRequestException``: if we get an HTTP response
+ Fails with ``HttpResponseException``: if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -480,7 +480,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -534,7 +534,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -589,7 +589,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -640,7 +640,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -684,7 +684,7 @@ class MatrixFederationHttpClient(object):
Deferred: resolves with an (int,dict) tuple of the file length and
a dict of the response headers.
- Fails with ``HTTPRequestException`` if we get an HTTP response code
+ Fails with ``HttpResponseException`` if we get an HTTP response code
>= 300
Fails with ``NotRetryingDestination`` if we are not yet ready
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index fedb4e6b18..62045a918b 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -39,7 +39,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
- "synapse_http_server_response_time_seconds", "sec",
+ "synapse_http_server_response_time_seconds",
+ "sec",
["method", "servlet", "tag", "code"],
)
@@ -79,15 +80,11 @@ response_size = Counter(
# than when the response was written.
in_flight_requests_ru_utime = Counter(
- "synapse_http_server_in_flight_requests_ru_utime_seconds",
- "",
- ["method", "servlet"],
+ "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
)
in_flight_requests_ru_stime = Counter(
- "synapse_http_server_in_flight_requests_ru_stime_seconds",
- "",
- ["method", "servlet"],
+ "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
)
in_flight_requests_db_txn_count = Counter(
@@ -134,7 +131,7 @@ def _get_in_flight_counts():
# type
counts = {}
for rm in reqs:
- key = (rm.method, rm.name,)
+ key = (rm.method, rm.name)
counts[key] = counts.get(key, 0) + 1
return counts
@@ -175,7 +172,8 @@ class RequestMetrics(object):
if context != self.start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
- context, self.start_context
+ context,
+ self.start_context,
)
return
@@ -192,10 +190,10 @@ class RequestMetrics(object):
resource_usage = context.get_resource_usage()
response_ru_utime.labels(self.method, self.name, tag).inc(
- resource_usage.ru_utime,
+ resource_usage.ru_utime
)
response_ru_stime.labels(self.method, self.name, tag).inc(
- resource_usage.ru_stime,
+ resource_usage.ru_stime
)
response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
@@ -222,8 +220,15 @@ class RequestMetrics(object):
diff = new_stats - self._request_stats
self._request_stats = new_stats
- in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
- in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
+ # max() is used since rapid use of ru_stime/ru_utime can end up with the
+ # count going backwards due to NTP, time smearing, fine-grained
+ # correction, or floating points. Who knows, really?
+ in_flight_requests_ru_utime.labels(self.method, self.name).inc(
+ max(diff.ru_utime, 0)
+ )
+ in_flight_requests_ru_stime.labels(self.method, self.name).inc(
+ max(diff.ru_stime, 0)
+ )
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
diff.db_txn_count
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 340b16ce25..de02b1017e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -186,9 +186,9 @@ class Notifier(object):
def count_listeners():
all_user_streams = set()
- for x in self.room_to_user_streams.values():
+ for x in list(self.room_to_user_streams.values()):
all_user_streams |= x
- for x in self.user_to_user_stream.values():
+ for x in list(self.user_to_user_stream.values()):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
@@ -196,7 +196,7 @@ class Notifier(object):
LaterGauge(
"synapse_notifier_rooms", "", [],
- lambda: count(bool, self.room_to_user_streams.values()),
+ lambda: count(bool, list(self.room_to_user_streams.values())),
)
LaterGauge(
"synapse_notifier_users", "", [],
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..f369124258 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@@ -71,18 +70,11 @@ class EmailPusher(object):
# See httppusher
self.max_stream_ordering = None
- self.processing = False
+ self._is_processing = False
- @defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
- try:
- self.throttle_params = yield self.store.get_throttle_params_by_room(
- self.pusher_id
- )
- yield self._process()
- except Exception:
- logger.exception("Error starting email pusher")
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -92,43 +84,52 @@ class EmailPusher(object):
pass
self.timed_call = None
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
- yield self._process()
+ self._start_processing()
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the
# timer fire
- return defer.succeed(None)
+ pass
- @defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
- yield self._process()
+ self._start_processing()
+
+ def _start_processing(self):
+ if self._is_processing:
+ return
+
+ run_as_background_process("emailpush.process", self._process)
@defer.inlineCallbacks
def _process(self):
- if self.processing:
- return
+ # we should never get here if we are already processing
+ assert not self._is_processing
+
+ try:
+ self._is_processing = True
+
+ if self.throttle_params is None:
+ # this is our first loop: load up the throttle params
+ self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.pusher_id
+ )
- with LoggingContext("emailpush._process"):
- with Measure(self.clock, "emailpush._process"):
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 48abd5e4d6..6bd703632d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,9 +22,8 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
@@ -61,7 +60,7 @@ class HttpPusher(object):
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict['failing_since']
self.timed_call = None
- self.processing = False
+ self._is_processing = False
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
@@ -92,34 +91,27 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
- @defer.inlineCallbacks
def on_started(self):
- try:
- yield self._process()
- except Exception:
- logger.exception("Error starting http pusher")
+ self._start_processing()
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
- yield self._process()
+ self._start_processing()
- @defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate.
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
- with LoggingContext("push.on_new_receipts"):
- with Measure(self.clock, "push.on_new_receipts"):
- badge = yield push_tools.get_badge_count(
- self.hs.get_datastore(), self.user_id
- )
- yield self._send_badge(badge)
+ run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
@defer.inlineCallbacks
+ def _update_badge(self):
+ badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+ yield self._send_badge(badge)
+
def on_timer(self):
- yield self._process()
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -129,27 +121,31 @@ class HttpPusher(object):
pass
self.timed_call = None
+ def _start_processing(self):
+ if self._is_processing:
+ return
+
+ run_as_background_process("httppush.process", self._process)
+
@defer.inlineCallbacks
def _process(self):
- if self.processing:
- return
+ # we should never get here if we are already processing
+ assert not self._is_processing
- with LoggingContext("push._process"):
- with Measure(self.clock, "push._process"):
+ try:
+ self._is_processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b9dcfee740..16fb5e8471 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -526,12 +526,8 @@ def load_jinja2_templates(config):
Returns:
(notif_template_html, notif_template_text)
"""
- logger.info("loading jinja2")
-
- if config.email_template_dir:
- loader = jinja2.FileSystemLoader(config.email_template_dir)
- else:
- loader = jinja2.PackageLoader('synapse', 'res/templates')
+ logger.info("loading email templates from '%s'", config.email_template_dir)
+ loader = jinja2.FileSystemLoader(config.email_template_dir)
env = jinja2.Environment(loader=loader)
env.filters["format_ts"] = format_ts_filter
env.filters["mxc_to_http"] = _create_mxc_to_http_filter(config)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 9f7d5ef217..5a4e73ccd6 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -20,24 +20,39 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class PusherPool:
+ """
+ The pusher pool. This is responsible for dispatching notifications of new events to
+ the http and email pushers.
+
+ It provides three methods which are designed to be called by the rest of the
+ application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
+ delegates to each of the relevant pushers.
+
+ Note that it is expected that each pusher will have its own 'processing' loop which
+ will send out the notifications in the background, rather than blocking until the
+ notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
+ Pusher.on_new_receipts are not expected to return deferreds.
+ """
def __init__(self, _hs):
self.hs = _hs
self.pusher_factory = PusherFactory(_hs)
- self.start_pushers = _hs.config.start_pushers
+ self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.pushers = {}
- @defer.inlineCallbacks
def start(self):
- pushers = yield self.store.get_all_pushers()
- self._start_pushers(pushers)
+ """Starts the pushers off in a background process.
+ """
+ if not self._should_start_pushers:
+ logger.info("Not starting pushers because they are disabled in the config")
+ return
+ run_as_background_process("start_pushers", self._start_pushers)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@@ -86,7 +101,7 @@ class PusherPool:
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
- yield self._refresh_pusher(app_id, pushkey, user_id)
+ yield self.start_pusher_by_id(app_id, pushkey, user_id)
@defer.inlineCallbacks
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@@ -123,45 +138,23 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
- def on_new_notifications(self, min_stream_id, max_stream_id):
- run_as_background_process(
- "on_new_notifications",
- self._on_new_notifications, min_stream_id, max_stream_id,
- )
-
@defer.inlineCallbacks
- def _on_new_notifications(self, min_stream_id, max_stream_id):
+ def on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
)
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_notifications,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_notifications(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_notifications")
- def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
- run_as_background_process(
- "on_new_receipts",
- self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
- )
-
@defer.inlineCallbacks
- def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@@ -171,26 +164,20 @@ class PusherPool:
# This returns a tuple, user_id is at index 3
users_affected = set([r[3] for r in updated_receipts])
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_receipts,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_receipts(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_receipts")
@defer.inlineCallbacks
- def _refresh_pusher(self, app_id, pushkey, user_id):
+ def start_pusher_by_id(self, app_id, pushkey, user_id):
+ """Look up the details for the given pusher, and start it"""
+ if not self._should_start_pushers:
+ return
+
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
)
@@ -201,33 +188,49 @@ class PusherPool:
p = r
if p:
+ self._start_pusher(p)
- self._start_pushers([p])
+ @defer.inlineCallbacks
+ def _start_pushers(self):
+ """Start all the pushers
- def _start_pushers(self, pushers):
- if not self.start_pushers:
- logger.info("Not starting pushers because they are disabled in the config")
- return
+ Returns:
+ Deferred
+ """
+ pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
- try:
- p = self.pusher_factory.create_pusher(pusherdict)
- except Exception:
- logger.exception("Couldn't start a pusher: caught Exception")
- continue
- if p:
- appid_pushkey = "%s:%s" % (
- pusherdict['app_id'],
- pusherdict['pushkey'],
- )
- byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+ self._start_pusher(pusherdict)
+ logger.info("Started pushers")
- if appid_pushkey in byuser:
- byuser[appid_pushkey].on_stop()
- byuser[appid_pushkey] = p
- run_in_background(p.on_started)
+ def _start_pusher(self, pusherdict):
+ """Start the given pusher
- logger.info("Started pushers")
+ Args:
+ pusherdict (dict):
+
+ Returns:
+ None
+ """
+ try:
+ p = self.pusher_factory.create_pusher(pusherdict)
+ except Exception:
+ logger.exception("Couldn't start a pusher: caught Exception")
+ return
+
+ if not p:
+ return
+
+ appid_pushkey = "%s:%s" % (
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
+ )
+ byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
+ p.on_started()
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index f51184b50d..943876456b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -53,6 +53,7 @@ REQUIREMENTS = {
"pillow>=3.1.2": ["PIL"],
"pydenticon>=0.2": ["pydenticon"],
"sortedcontainers>=1.4.4": ["sortedcontainers"],
+ "psutil>=2.0.0": ["psutil>=2.0.0"],
"pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.4.2": ["msgpack"],
@@ -79,9 +80,6 @@ CONDITIONAL_REQUIREMENTS = {
"matrix-synapse-ldap3": {
"matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"],
},
- "psutil": {
- "psutil>=2.0.0": ["psutil>=2.0.0"],
- },
"postgres": {
"psycopg2>=2.6": ["psycopg2"]
}
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index bd8b5f4afa..693b303881 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -99,7 +99,7 @@ class AuthRestServlet(RestServlet):
cannot be handled in the normal flow (with requests to the same endpoint).
Current use is for web fallback auth.
"""
- PATTERNS = client_v2_patterns("/auth/(?P<stagetype>[\w\.]*)/fallback/web")
+ PATTERNS = client_v2_patterns(r"/auth/(?P<stagetype>[\w\.]*)/fallback/web")
def __init__(self, hs):
super(AuthRestServlet, self).__init__()
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index a828ff4438..08b1867fab 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse
import twisted.internet.error
import twisted.web.http
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.resource import Resource
from synapse.api.errors import (
@@ -36,8 +36,8 @@ from synapse.api.errors import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
from synapse.util.async_helpers import Linearizer
-from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import is_ascii, random_string
@@ -492,10 +492,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
- ))
+ )
if t_byte_source:
try:
@@ -534,10 +535,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
- ))
+ )
if t_byte_source:
try:
@@ -620,15 +622,17 @@ class MediaRepository(object):
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
# Generate the thumbnail
if t_method == "crop":
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
thumbnailer.crop,
t_width, t_height, t_type,
- ))
+ )
elif t_method == "scale":
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
thumbnailer.scale,
t_width, t_height, t_type,
- ))
+ )
else:
logger.error("Unrecognized method: %r", t_method)
continue
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index a6189224ee..896078fe76 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -21,9 +21,10 @@ import sys
import six
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.protocols.basic import FileSender
+from synapse.util import logcontext
from synapse.util.file_consumer import BackgroundFileConsumer
from synapse.util.logcontext import make_deferred_yieldable
@@ -64,9 +65,10 @@ class MediaStorage(object):
with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
- yield make_deferred_yieldable(threads.deferToThread(
+ yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
_write_file_synchronously, source, f,
- ))
+ )
yield finish_cb()
defer.returnValue(fname)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 8c892ff187..1a7bfd6b56 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -674,7 +674,7 @@ def summarize_paragraphs(text_nodes, min_size=200, max_size=500):
# This splits the paragraph into words, but keeping the
# (preceeding) whitespace intact so we can easily concat
# words back together.
- for match in re.finditer("\s*\S+", description):
+ for match in re.finditer(r"\s*\S+", description):
word = match.group()
# Keep adding words while the total length is less than
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 7b9f8b4d79..5aa03031f6 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,9 +17,10 @@ import logging
import os
import shutil
-from twisted.internet import defer, threads
+from twisted.internet import defer
from synapse.config._base import Config
+from synapse.util import logcontext
from synapse.util.logcontext import run_in_background
from .media_storage import FileResponder
@@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider):
if not os.path.exists(dirname):
os.makedirs(dirname)
- return threads.deferToThread(
+ return logcontext.defer_to_thread(
+ self.hs.get_reactor(),
shutil.copyfile, primary_fname, backup_fname,
)
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index b22495c1f9..9b40b18d5b 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -19,13 +19,14 @@ from collections import namedtuple
from six import iteritems, itervalues
+import attr
from frozendict import frozendict
from twisted.internet import defer
from synapse.api.constants import EventTypes, RoomVersions
from synapse.events.snapshot import EventContext
-from synapse.state import v1
+from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
@@ -372,15 +373,10 @@ class StateHandler(object):
result = yield self._state_resolution_handler.resolve_state_groups(
room_id, room_version, state_groups_ids, None,
- self._state_map_factory,
+ state_res_store=StateResolutionStore(self.store),
)
defer.returnValue(result)
- def _state_map_factory(self, ev_ids):
- return self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
-
@defer.inlineCallbacks
def resolve_events(self, room_version, state_sets, event):
logger.info(
@@ -398,10 +394,10 @@ class StateHandler(object):
}
with Measure(self.clock, "state._resolve_events"):
- new_state = yield resolve_events_with_factory(
+ new_state = yield resolve_events_with_store(
room_version, state_set_ids,
event_map=state_map,
- state_map_factory=self._state_map_factory
+ state_res_store=StateResolutionStore(self.store),
)
new_state = {
@@ -436,7 +432,7 @@ class StateResolutionHandler(object):
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
- self, room_id, room_version, state_groups_ids, event_map, state_map_factory,
+ self, room_id, room_version, state_groups_ids, event_map, state_res_store,
):
"""Resolves conflicts between a set of state groups
@@ -454,9 +450,11 @@ class StateResolutionHandler(object):
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
- events will be requested via state_map_factory.
+ events will be requested via state_res_store.
+
+ If None, all events will be fetched via state_res_store.
- If None, all events will be fetched via state_map_factory.
+ state_res_store (StateResolutionStore)
Returns:
Deferred[_StateCacheEntry]: resolved state
@@ -480,10 +478,10 @@ class StateResolutionHandler(object):
# start by assuming we won't have any conflicted state, and build up the new
# state map by iterating through the state groups. If we discover a conflict,
- # we give up and instead use `resolve_events_with_factory`.
+ # we give up and instead use `resolve_events_with_store`.
#
# XXX: is this actually worthwhile, or should we just let
- # resolve_events_with_factory do it?
+ # resolve_events_with_store do it?
new_state = {}
conflicted_state = False
for st in itervalues(state_groups_ids):
@@ -498,11 +496,11 @@ class StateResolutionHandler(object):
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
- new_state = yield resolve_events_with_factory(
+ new_state = yield resolve_events_with_store(
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
- state_map_factory=state_map_factory,
+ state_res_store=state_res_store,
)
# if the new state matches any of the input state groups, we can
@@ -583,7 +581,7 @@ def _make_state_cache_entry(
)
-def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
+def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
"""
Args:
room_version(str): Version of the room
@@ -599,17 +597,19 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
If None, all events will be fetched via state_map_factory.
- state_map_factory(func): will be called
- with a list of event_ids that are needed, and should return with
- a Deferred of dict of event_id to event.
+ state_res_store (StateResolutionStore)
Returns
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
- if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
- return v1.resolve_events_with_factory(
- state_sets, event_map, state_map_factory,
+ if room_version == RoomVersions.V1:
+ return v1.resolve_events_with_store(
+ state_sets, event_map, state_res_store.get_events,
+ )
+ elif room_version == RoomVersions.VDH_TEST:
+ return v2.resolve_events_with_store(
+ state_sets, event_map, state_res_store,
)
else:
# This should only happen if we added a version but forgot to add it to
@@ -617,3 +617,54 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
raise Exception(
"No state resolution algorithm defined for version %r" % (room_version,)
)
+
+
+@attr.s
+class StateResolutionStore(object):
+ """Interface that allows state resolution algorithms to access the database
+ in well defined way.
+
+ Args:
+ store (DataStore)
+ """
+
+ store = attr.ib()
+
+ def get_events(self, event_ids, allow_rejected=False):
+ """Get events from the database
+
+ Args:
+ event_ids (list): The event_ids of the events to fetch
+ allow_rejected (bool): If True return rejected events.
+
+ Returns:
+ Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
+ """
+
+ return self.store.get_events(
+ event_ids,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=allow_rejected,
+ )
+
+ def get_auth_chain(self, event_ids):
+ """Gets the full auth chain for a set of events (including rejected
+ events).
+
+ Includes the given event IDs in the result.
+
+ Note that:
+ 1. All events must be state events.
+ 2. For v1 rooms this may not have the full auth chain in the
+ presence of rejected events
+
+ Args:
+ event_ids (list): The event IDs of the events to fetch the auth
+ chain for. Must be state events.
+
+ Returns:
+ Deferred[list[str]]: List of event IDs of the auth chain.
+ """
+
+ return self.store.get_auth_chain_ids(event_ids, include_given=True)
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 7a7157b352..70a981f4a2 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -31,7 +31,7 @@ POWER_KEY = (EventTypes.PowerLevels, "")
@defer.inlineCallbacks
-def resolve_events_with_factory(state_sets, event_map, state_map_factory):
+def resolve_events_with_store(state_sets, event_map, state_map_factory):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
new file mode 100644
index 0000000000..5d06f7e928
--- /dev/null
+++ b/synapse/state/v2.py
@@ -0,0 +1,544 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import heapq
+import itertools
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
+from synapse import event_auth
+from synapse.api.constants import EventTypes
+from synapse.api.errors import AuthError
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def resolve_events_with_store(state_sets, event_map, state_res_store):
+ """Resolves the state using the v2 state resolution algorithm
+
+ Args:
+ state_sets(list): List of dicts of (type, state_key) -> event_id,
+ which are the different state groups to resolve.
+
+ event_map(dict[str,FrozenEvent]|None):
+ a dict from event_id to event, for any events that we happen to
+ have in flight (eg, those currently being persisted). This will be
+ used as a starting point fof finding the state we need; any missing
+ events will be requested via state_res_store.
+
+ If None, all events will be fetched via state_res_store.
+
+ state_res_store (StateResolutionStore)
+
+ Returns
+ Deferred[dict[(str, str), str]]:
+ a map from (type, state_key) to event_id.
+ """
+
+ logger.debug("Computing conflicted state")
+
+ # First split up the un/conflicted state
+ unconflicted_state, conflicted_state = _seperate(state_sets)
+
+ if not conflicted_state:
+ defer.returnValue(unconflicted_state)
+
+ logger.debug("%d conflicted state entries", len(conflicted_state))
+ logger.debug("Calculating auth chain difference")
+
+ # Also fetch all auth events that appear in only some of the state sets'
+ # auth chains.
+ auth_diff = yield _get_auth_chain_difference(
+ state_sets, event_map, state_res_store,
+ )
+
+ full_conflicted_set = set(itertools.chain(
+ itertools.chain.from_iterable(itervalues(conflicted_state)),
+ auth_diff,
+ ))
+
+ events = yield state_res_store.get_events([
+ eid for eid in full_conflicted_set
+ if eid not in event_map
+ ], allow_rejected=True)
+ event_map.update(events)
+
+ full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map)
+
+ logger.debug("%d full_conflicted_set entries", len(full_conflicted_set))
+
+ # Get and sort all the power events (kicks/bans/etc)
+ power_events = (
+ eid for eid in full_conflicted_set
+ if _is_power_event(event_map[eid])
+ )
+
+ sorted_power_events = yield _reverse_topological_power_sort(
+ power_events,
+ event_map,
+ state_res_store,
+ full_conflicted_set,
+ )
+
+ logger.debug("sorted %d power events", len(sorted_power_events))
+
+ # Now sequentially auth each one
+ resolved_state = yield _iterative_auth_checks(
+ sorted_power_events, unconflicted_state, event_map,
+ state_res_store,
+ )
+
+ logger.debug("resolved power events")
+
+ # OK, so we've now resolved the power events. Now sort the remaining
+ # events using the mainline of the resolved power level.
+
+ leftover_events = [
+ ev_id
+ for ev_id in full_conflicted_set
+ if ev_id not in sorted_power_events
+ ]
+
+ logger.debug("sorting %d remaining events", len(leftover_events))
+
+ pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
+ leftover_events = yield _mainline_sort(
+ leftover_events, pl, event_map, state_res_store,
+ )
+
+ logger.debug("resolving remaining events")
+
+ resolved_state = yield _iterative_auth_checks(
+ leftover_events, resolved_state, event_map,
+ state_res_store,
+ )
+
+ logger.debug("resolved")
+
+ # We make sure that unconflicted state always still applies.
+ resolved_state.update(unconflicted_state)
+
+ logger.debug("done")
+
+ defer.returnValue(resolved_state)
+
+
+@defer.inlineCallbacks
+def _get_power_level_for_sender(event_id, event_map, state_res_store):
+ """Return the power level of the sender of the given event according to
+ their auth events.
+
+ Args:
+ event_id (str)
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[int]
+ """
+ event = yield _get_event(event_id, event_map, state_res_store)
+
+ pl = None
+ for aid, _ in event.auth_events:
+ aev = yield _get_event(aid, event_map, state_res_store)
+ if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
+ pl = aev
+ break
+
+ if pl is None:
+ # Couldn't find power level. Check if they're the creator of the room
+ for aid, _ in event.auth_events:
+ aev = yield _get_event(aid, event_map, state_res_store)
+ if (aev.type, aev.state_key) == (EventTypes.Create, ""):
+ if aev.content.get("creator") == event.sender:
+ defer.returnValue(100)
+ break
+ defer.returnValue(0)
+
+ level = pl.content.get("users", {}).get(event.sender)
+ if level is None:
+ level = pl.content.get("users_default", 0)
+
+ if level is None:
+ defer.returnValue(0)
+ else:
+ defer.returnValue(int(level))
+
+
+@defer.inlineCallbacks
+def _get_auth_chain_difference(state_sets, event_map, state_res_store):
+ """Compare the auth chains of each state set and return the set of events
+ that only appear in some but not all of the auth chains.
+
+ Args:
+ state_sets (list)
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[set[str]]: Set of event IDs
+ """
+ common = set(itervalues(state_sets[0])).intersection(
+ *(itervalues(s) for s in state_sets[1:])
+ )
+
+ auth_sets = []
+ for state_set in state_sets:
+ auth_ids = set(
+ eid
+ for key, eid in iteritems(state_set)
+ if (key[0] in (
+ EventTypes.Member,
+ EventTypes.ThirdPartyInvite,
+ ) or key in (
+ (EventTypes.PowerLevels, ''),
+ (EventTypes.Create, ''),
+ (EventTypes.JoinRules, ''),
+ )) and eid not in common
+ )
+
+ auth_chain = yield state_res_store.get_auth_chain(auth_ids)
+ auth_ids.update(auth_chain)
+
+ auth_sets.append(auth_ids)
+
+ intersection = set(auth_sets[0]).intersection(*auth_sets[1:])
+ union = set().union(*auth_sets)
+
+ defer.returnValue(union - intersection)
+
+
+def _seperate(state_sets):
+ """Return the unconflicted and conflicted state. This is different than in
+ the original algorithm, as this defines a key to be conflicted if one of
+ the state sets doesn't have that key.
+
+ Args:
+ state_sets (list)
+
+ Returns:
+ tuple[dict, dict]: A tuple of unconflicted and conflicted state. The
+ conflicted state dict is a map from type/state_key to set of event IDs
+ """
+ unconflicted_state = {}
+ conflicted_state = {}
+
+ for key in set(itertools.chain.from_iterable(state_sets)):
+ event_ids = set(state_set.get(key) for state_set in state_sets)
+ if len(event_ids) == 1:
+ unconflicted_state[key] = event_ids.pop()
+ else:
+ event_ids.discard(None)
+ conflicted_state[key] = event_ids
+
+ return unconflicted_state, conflicted_state
+
+
+def _is_power_event(event):
+ """Return whether or not the event is a "power event", as defined by the
+ v2 state resolution algorithm
+
+ Args:
+ event (FrozenEvent)
+
+ Returns:
+ boolean
+ """
+ if (event.type, event.state_key) in (
+ (EventTypes.PowerLevels, ""),
+ (EventTypes.JoinRules, ""),
+ (EventTypes.Create, ""),
+ ):
+ return True
+
+ if event.type == EventTypes.Member:
+ if event.membership in ('leave', 'ban'):
+ return event.sender != event.state_key
+
+ return False
+
+
+@defer.inlineCallbacks
+def _add_event_and_auth_chain_to_graph(graph, event_id, event_map,
+ state_res_store, auth_diff):
+ """Helper function for _reverse_topological_power_sort that add the event
+ and its auth chain (that is in the auth diff) to the graph
+
+ Args:
+ graph (dict[str, set[str]]): A map from event ID to the events auth
+ event IDs
+ event_id (str): Event to add to the graph
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+ auth_diff (set[str]): Set of event IDs that are in the auth difference.
+ """
+
+ state = [event_id]
+ while state:
+ eid = state.pop()
+ graph.setdefault(eid, set())
+
+ event = yield _get_event(eid, event_map, state_res_store)
+ for aid, _ in event.auth_events:
+ if aid in auth_diff:
+ if aid not in graph:
+ state.append(aid)
+
+ graph.setdefault(eid, set()).add(aid)
+
+
+@defer.inlineCallbacks
+def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_diff):
+ """Returns a list of the event_ids sorted by reverse topological ordering,
+ and then by power level and origin_server_ts
+
+ Args:
+ event_ids (list[str]): The events to sort
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+ auth_diff (set[str]): Set of event IDs that are in the auth difference.
+
+ Returns:
+ Deferred[list[str]]: The sorted list
+ """
+
+ graph = {}
+ for event_id in event_ids:
+ yield _add_event_and_auth_chain_to_graph(
+ graph, event_id, event_map, state_res_store, auth_diff,
+ )
+
+ event_to_pl = {}
+ for event_id in graph:
+ pl = yield _get_power_level_for_sender(event_id, event_map, state_res_store)
+ event_to_pl[event_id] = pl
+
+ def _get_power_order(event_id):
+ ev = event_map[event_id]
+ pl = event_to_pl[event_id]
+
+ return -pl, ev.origin_server_ts, event_id
+
+ # Note: graph is modified during the sort
+ it = lexicographical_topological_sort(
+ graph,
+ key=_get_power_order,
+ )
+ sorted_events = list(it)
+
+ defer.returnValue(sorted_events)
+
+
+@defer.inlineCallbacks
+def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store):
+ """Sequentially apply auth checks to each event in given list, updating the
+ state as it goes along.
+
+ Args:
+ event_ids (list[str]): Ordered list of events to apply auth checks to
+ base_state (dict[tuple[str, str], str]): The set of state to start with
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[dict[tuple[str, str], str]]: Returns the final updated state
+ """
+ resolved_state = base_state.copy()
+
+ for event_id in event_ids:
+ event = event_map[event_id]
+
+ auth_events = {}
+ for aid, _ in event.auth_events:
+ ev = yield _get_event(aid, event_map, state_res_store)
+
+ if ev.rejected_reason is None:
+ auth_events[(ev.type, ev.state_key)] = ev
+
+ for key in event_auth.auth_types_for_event(event):
+ if key in resolved_state:
+ ev_id = resolved_state[key]
+ ev = yield _get_event(ev_id, event_map, state_res_store)
+
+ if ev.rejected_reason is None:
+ auth_events[key] = event_map[ev_id]
+
+ try:
+ event_auth.check(
+ event, auth_events,
+ do_sig_check=False,
+ do_size_check=False
+ )
+
+ resolved_state[(event.type, event.state_key)] = event_id
+ except AuthError:
+ pass
+
+ defer.returnValue(resolved_state)
+
+
+@defer.inlineCallbacks
+def _mainline_sort(event_ids, resolved_power_event_id, event_map,
+ state_res_store):
+ """Returns a sorted list of event_ids sorted by mainline ordering based on
+ the given event resolved_power_event_id
+
+ Args:
+ event_ids (list[str]): Events to sort
+ resolved_power_event_id (str): The final resolved power level event ID
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[list[str]]: The sorted list
+ """
+ mainline = []
+ pl = resolved_power_event_id
+ while pl:
+ mainline.append(pl)
+ pl_ev = yield _get_event(pl, event_map, state_res_store)
+ auth_events = pl_ev.auth_events
+ pl = None
+ for aid, _ in auth_events:
+ ev = yield _get_event(aid, event_map, state_res_store)
+ if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""):
+ pl = aid
+ break
+
+ mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
+
+ event_ids = list(event_ids)
+
+ order_map = {}
+ for ev_id in event_ids:
+ depth = yield _get_mainline_depth_for_event(
+ event_map[ev_id], mainline_map,
+ event_map, state_res_store,
+ )
+ order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
+
+ event_ids.sort(key=lambda ev_id: order_map[ev_id])
+
+ defer.returnValue(event_ids)
+
+
+@defer.inlineCallbacks
+def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_store):
+ """Get the mainline depths for the given event based on the mainline map
+
+ Args:
+ event (FrozenEvent)
+ mainline_map (dict[str, int]): Map from event_id to mainline depth for
+ events in the mainline.
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[int]
+ """
+
+ # We do an iterative search, replacing `event with the power level in its
+ # auth events (if any)
+ while event:
+ depth = mainline_map.get(event.event_id)
+ if depth is not None:
+ defer.returnValue(depth)
+
+ auth_events = event.auth_events
+ event = None
+
+ for aid, _ in auth_events:
+ aev = yield _get_event(aid, event_map, state_res_store)
+ if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
+ event = aev
+ break
+
+ # Didn't find a power level auth event, so we just return 0
+ defer.returnValue(0)
+
+
+@defer.inlineCallbacks
+def _get_event(event_id, event_map, state_res_store):
+ """Helper function to look up event in event_map, falling back to looking
+ it up in the store
+
+ Args:
+ event_id (str)
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[FrozenEvent]
+ """
+ if event_id not in event_map:
+ events = yield state_res_store.get_events([event_id], allow_rejected=True)
+ event_map.update(events)
+ defer.returnValue(event_map[event_id])
+
+
+def lexicographical_topological_sort(graph, key):
+ """Performs a lexicographic reverse topological sort on the graph.
+
+ This returns a reverse topological sort (i.e. if node A references B then B
+ appears before A in the sort), with ties broken lexicographically based on
+ return value of the `key` function.
+
+ NOTE: `graph` is modified during the sort.
+
+ Args:
+ graph (dict[str, set[str]]): A representation of the graph where each
+ node is a key in the dict and its value are the nodes edges.
+ key (func): A function that takes a node and returns a value that is
+ comparable and used to order nodes
+
+ Yields:
+ str: The next node in the topological sort
+ """
+
+ # Note, this is basically Kahn's algorithm except we look at nodes with no
+ # outgoing edges, c.f.
+ # https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm
+ outdegree_map = graph
+ reverse_graph = {}
+
+ # Lists of nodes with zero out degree. Is actually a tuple of
+ # `(key(node), node)` so that sorting does the right thing
+ zero_outdegree = []
+
+ for node, edges in iteritems(graph):
+ if len(edges) == 0:
+ zero_outdegree.append((key(node), node))
+
+ reverse_graph.setdefault(node, set())
+ for edge in edges:
+ reverse_graph.setdefault(edge, set()).add(node)
+
+ # heapq is a built in implementation of a sorted queue.
+ heapq.heapify(zero_outdegree)
+
+ while zero_outdegree:
+ _, node = heapq.heappop(zero_outdegree)
+
+ for parent in reverse_graph[node]:
+ out = outdegree_map[parent]
+ out.discard(node)
+ if len(out) == 0:
+ heapq.heappush(zero_outdegree, (key(parent), parent))
+
+ yield node
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index be61147b9b..d9d0255d0b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,7 @@ import threading
import time
from six import PY2, iteritems, iterkeys, itervalues
-from six.moves import intern, range
+from six.moves import builtins, intern, range
from canonicaljson import json
from prometheus_client import Histogram
@@ -1233,7 +1233,7 @@ def db_to_json(db_content):
# psycopg2 on Python 2 returns buffer objects, which we need to cast to
# bytes to decode
- if PY2 and isinstance(db_content, buffer):
+ if PY2 and isinstance(db_content, builtins.buffer):
db_content = bytes(db_content)
# Decode it to a Unicode string before feeding it to json.loads, so we
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index cfb687cb53..61a029a53c 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -90,7 +90,7 @@ class DirectoryWorkerStore(SQLBaseStore):
class DirectoryStore(DirectoryWorkerStore):
@defer.inlineCallbacks
def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
- """ Creates an associatin between a room alias and room_id/servers
+ """ Creates an association between a room alias and room_id/servers
Args:
room_alias (RoomAlias)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 03cedf3a75..c780f55277 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
@@ -731,11 +732,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Ok, we need to defer to the state handler to resolve our state sets.
- def get_events(ev_ids):
- return self.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
-
state_groups = {
sg: state_groups_map[sg] for sg in new_state_groups
}
@@ -745,7 +741,8 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
- room_id, room_version, state_groups, events_map, get_events
+ room_id, room_version, state_groups, events_map,
+ state_res_store=StateResolutionStore(self)
)
defer.returnValue((res.state, None))
@@ -854,6 +851,27 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
+ # We want to store event_auth mappings for rejected events, as they're
+ # used in state res v2.
+ # This is only necessary if the rejected event appears in an accepted
+ # event's auth chain, but its easier for now just to store them (and
+ # it doesn't take much storage compared to storing the entire event
+ # anyway).
+ self._simple_insert_many_txn(
+ txn,
+ table="event_auth",
+ values=[
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ }
+ for event, _ in events_and_contexts
+ for auth_id, _ in event.auth_events
+ if event.is_state()
+ ],
+ )
+
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
events_and_contexts = self._store_rejected_events_txn(
@@ -1329,21 +1347,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
txn, event.room_id, event.redacts
)
- self._simple_insert_many_txn(
- txn,
- table="event_auth",
- values=[
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "auth_id": auth_id,
- }
- for event, _ in events_and_contexts
- for auth_id, _ in event.auth_events
- if event.is_state()
- ],
- )
-
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a1331c1a61..8af17921e3 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c7987bfcdd..2743b52bad 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26b429e307..2dd14aba1c 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -567,7 +567,7 @@ class RegistrationStore(RegistrationWorkerStore,
def _find_next_generated_user_id(txn):
txn.execute("SELECT name FROM users")
- regex = re.compile("^@(\d+):")
+ regex = re.compile(r"^@(\d+):")
found = set()
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 5623391f6e..158e9dbe7b 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -27,7 +27,7 @@ from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index a3032cdce9..d8bf953ec0 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
import logging
+from six import integer_types
+
from sortedcontainers import SortedDict
from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
- assert type(stream_pos) is int or type(stream_pos) is long
+ assert type(stream_pos) in integer_types
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
import logging
import threading
-from twisted.internet import defer
+from twisted.internet import defer, threads
logger = logging.getLogger(__name__)
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
return result
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
- "synapse.util.logcontext",
- "synapse.http.server",
- "synapse.storage._base",
- "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+ """
+ Calls the function `f` using a thread from the reactor's default threadpool and
+ returns the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked, and whose threadpool we should use for the
+ function.
+
+ Normally this will be hs.get_reactor().
+
+ f (callable): The function to call.
+ args: positional arguments to pass to f.
-def logcontext_tracer(frame, event, arg):
- """A tracer that logs whenever a logcontext "unexpectedly" changes within
- a function. Probably inaccurate.
+ kwargs: keyword arguments to pass to f.
- Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
"""
- if event == 'call':
- name = frame.f_globals["__name__"]
- if name.startswith("synapse"):
- if name == "synapse.util.logcontext":
- if frame.f_code.co_name in ["__enter__", "__exit__"]:
- tracer = frame.f_back.f_trace
- if tracer:
- tracer.just_changed = True
-
- tracer = frame.f_trace
- if tracer:
- return tracer
-
- if not any(name.startswith(ig) for ig in _to_ignore):
- return LineTracer()
-
-
-class LineTracer(object):
- __slots__ = ["context", "just_changed"]
-
- def __init__(self):
- self.context = LoggingContext.current_context()
- self.just_changed = False
-
- def __call__(self, frame, event, arg):
- if event in 'line':
- if self.just_changed:
- self.context = LoggingContext.current_context()
- self.just_changed = False
- else:
- c = LoggingContext.current_context()
- if c != self.context:
- logger.info(
- "Context changed! %s -> %s, %s, %s",
- self.context, c,
- frame.f_code.co_filename, frame.f_lineno
- )
- self.context = c
+ return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
- return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+ """
+ A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+ logcontexts correctly.
+
+ Calls the function `f` using a thread from the given threadpool and returns
+ the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+ threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+ running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+ f (callable): The function to call.
+
+ args: positional arguments to pass to f.
+
+ kwargs: keyword arguments to pass to f.
+
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
+ """
+ logcontext = LoggingContext.current_context()
+
+ def g():
+ with LoggingContext(parent_context=logcontext):
+ return f(*args, **kwargs)
+
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(reactor, threadpool, g)
+ )
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 8d0f2a8918..9cb7e9c9ab 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -70,6 +70,8 @@ def manhole(username, password, globals):
Returns:
twisted.internet.protocol.Factory: A factory to pass to ``listenTCP``
"""
+ if not isinstance(password, bytes):
+ password = password.encode('ascii')
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
**{username: password}
@@ -82,7 +84,7 @@ def manhole(username, password, globals):
)
factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
- factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY)
- factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY)
+ factory.publicKeys[b'ssh-rsa'] = Key.fromString(PUBLIC_KEY)
+ factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY)
return factory
diff --git a/synctl b/synctl
index bb8cb084cc..7e79b05c39 100755
--- a/synctl
+++ b/synctl
@@ -76,8 +76,7 @@ def start(configfile):
try:
subprocess.check_call(args)
- write("started synapse.app.homeserver(%r)" %
- (configfile,), colour=GREEN)
+ write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
"error starting (exit code: %d); see above for logs" % e.returncode,
@@ -86,21 +85,15 @@ def start(configfile):
def start_worker(app, configfile, worker_configfile):
- args = [
- sys.executable, "-B",
- "-m", app,
- "-c", configfile,
- "-c", worker_configfile
- ]
+ args = [sys.executable, "-B", "-m", app, "-c", configfile, "-c", worker_configfile]
try:
subprocess.check_call(args)
write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
- "error starting %s(%r) (exit code: %d); see above for logs" % (
- app, worker_configfile, e.returncode,
- ),
+ "error starting %s(%r) (exit code: %d); see above for logs"
+ % (app, worker_configfile, e.returncode),
colour=RED,
)
@@ -120,9 +113,9 @@ def stop(pidfile, app):
abort("Cannot stop %s: Unknown error" % (app,))
-Worker = collections.namedtuple("Worker", [
- "app", "configfile", "pidfile", "cache_factor", "cache_factors",
-])
+Worker = collections.namedtuple(
+ "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"]
+)
def main():
@@ -141,24 +134,20 @@ def main():
help="the homeserver config file, defaults to homeserver.yaml",
)
parser.add_argument(
- "-w", "--worker",
- metavar="WORKERCONFIG",
- help="start or stop a single worker",
+ "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker"
)
parser.add_argument(
- "-a", "--all-processes",
+ "-a",
+ "--all-processes",
metavar="WORKERCONFIGDIR",
help="start or stop all the workers in the given directory"
- " and the main synapse process",
+ " and the main synapse process",
)
options = parser.parse_args()
if options.worker and options.all_processes:
- write(
- 'Cannot use "--worker" with "--all-processes"',
- stream=sys.stderr
- )
+ write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr)
sys.exit(1)
configfile = options.configfile
@@ -167,9 +156,7 @@ def main():
write(
"No config file found\n"
"To generate a config file, run '%s -c %s --generate-config"
- " --server-name=<server name>'\n" % (
- " ".join(SYNAPSE), options.configfile
- ),
+ " --server-name=<server name>'\n" % (" ".join(SYNAPSE), options.configfile),
stream=sys.stderr,
)
sys.exit(1)
@@ -194,8 +181,7 @@ def main():
worker_configfile = options.worker
if not os.path.exists(worker_configfile):
write(
- "No worker config found at %r" % (worker_configfile,),
- stream=sys.stderr,
+ "No worker config found at %r" % (worker_configfile,), stream=sys.stderr
)
sys.exit(1)
worker_configfiles.append(worker_configfile)
@@ -211,9 +197,9 @@ def main():
stream=sys.stderr,
)
sys.exit(1)
- worker_configfiles.extend(sorted(glob.glob(
- os.path.join(worker_configdir, "*.yaml")
- )))
+ worker_configfiles.extend(
+ sorted(glob.glob(os.path.join(worker_configdir, "*.yaml")))
+ )
workers = []
for worker_configfile in worker_configfiles:
@@ -223,14 +209,12 @@ def main():
if worker_app == "synapse.app.homeserver":
# We need to special case all of this to pick up options that may
# be set in the main config file or in this worker config file.
- worker_pidfile = (
- worker_config.get("pid_file")
- or pidfile
+ worker_pidfile = worker_config.get("pid_file") or pidfile
+ worker_cache_factor = (
+ worker_config.get("synctl_cache_factor") or cache_factor
)
- worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
worker_cache_factors = (
- worker_config.get("synctl_cache_factors")
- or cache_factors
+ worker_config.get("synctl_cache_factors") or cache_factors
)
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
@@ -239,19 +223,27 @@ def main():
for key in worker_config:
if key == "worker_app": # But we allow worker_app
continue
- assert not key.startswith("worker_"), \
- "Main process cannot use worker_* config"
+ assert not key.startswith(
+ "worker_"
+ ), "Main process cannot use worker_* config"
else:
worker_pidfile = worker_config["worker_pid_file"]
worker_daemonize = worker_config["worker_daemonize"]
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
- worker_configfile, "worker_daemonize")
+ worker_configfile,
+ "worker_daemonize",
+ )
worker_cache_factor = worker_config.get("synctl_cache_factor")
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
- workers.append(Worker(
- worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
- worker_cache_factors,
- ))
+ workers.append(
+ Worker(
+ worker_app,
+ worker_configfile,
+ worker_pidfile,
+ worker_cache_factor,
+ worker_cache_factors,
+ )
+ )
action = options.action
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index 48b2d3d663..2a7044801a 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -60,7 +60,7 @@ class FilteringTestCase(unittest.TestCase):
invalid_filters = [
{"boom": {}},
{"account_data": "Hello World"},
- {"event_fields": ["\\foo"]},
+ {"event_fields": [r"\\foo"]},
{"room": {"timeline": {"limit": 0}, "state": {"not_bars": ["*"]}}},
{"event_format": "other"},
{"room": {"not_rooms": ["#foo:pik-test"]}},
@@ -109,6 +109,16 @@ class FilteringTestCase(unittest.TestCase):
"event_format": "client",
"event_fields": ["type", "content", "sender"],
},
+
+ # a single backslash should be permitted (though it is debatable whether
+ # it should be permitted before anything other than `.`, and what that
+ # actually means)
+ #
+ # (note that event_fields is implemented in
+ # synapse.events.utils.serialize_event, and so whether this actually works
+ # is tested elsewhere. We just want to check that it is allowed through the
+ # filter validation)
+ {"event_fields": [r"foo\.bar"]},
]
for filter in valid_filters:
try:
diff --git a/tests/config/test_generate.py b/tests/config/test_generate.py
index f88d28a19d..0c23068bcf 100644
--- a/tests/config/test_generate.py
+++ b/tests/config/test_generate.py
@@ -67,6 +67,6 @@ class ConfigGenerationTestCase(unittest.TestCase):
with open(log_config_file) as f:
config = f.read()
# find the 'filename' line
- matches = re.findall("^\s*filename:\s*(.*)$", config, re.M)
+ matches = re.findall(r"^\s*filename:\s*(.*)$", config, re.M)
self.assertEqual(1, len(matches))
self.assertEqual(matches[0], expected)
diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py
index ff217ca8b9..d0cc492deb 100644
--- a/tests/events/test_utils.py
+++ b/tests/events/test_utils.py
@@ -156,7 +156,7 @@ class SerializeEventTestCase(unittest.TestCase):
room_id="!foo:bar",
content={"key.with.dots": {}},
),
- ["content.key\.with\.dots"],
+ [r"content.key\.with\.dots"],
),
{"content": {"key.with.dots": {}}},
)
@@ -172,7 +172,7 @@ class SerializeEventTestCase(unittest.TestCase):
"nested.dot.key": {"leaf.key": 42, "not_me_either": 1},
},
),
- ["content.nested\.dot\.key.leaf\.key"],
+ [r"content.nested\.dot\.key.leaf\.key"],
),
{"content": {"nested.dot.key": {"leaf.key": 42}}},
)
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index 7b4ade3dfb..3e9a190727 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -19,7 +19,7 @@ from twisted.internet import defer
from synapse.api.errors import ResourceLimitError
from synapse.handlers.register import RegistrationHandler
-from synapse.types import UserID, create_requester
+from synapse.types import RoomAlias, UserID, create_requester
from tests.utils import setup_test_homeserver
@@ -41,30 +41,27 @@ class RegistrationTestCase(unittest.TestCase):
self.mock_captcha_client = Mock()
self.hs = yield setup_test_homeserver(
self.addCleanup,
- handlers=None,
- http_client=None,
expire_access_token=True,
- profile_handler=Mock(),
)
self.macaroon_generator = Mock(
generate_access_token=Mock(return_value='secret')
)
self.hs.get_macaroon_generator = Mock(return_value=self.macaroon_generator)
- self.hs.handlers = RegistrationHandlers(self.hs)
self.handler = self.hs.get_handlers().registration_handler
self.store = self.hs.get_datastore()
self.hs.config.max_mau_value = 50
self.lots_of_users = 100
self.small_number_of_users = 1
+ self.requester = create_requester("@requester:test")
+
@defer.inlineCallbacks
def test_user_is_created_and_logged_in_if_doesnt_exist(self):
- local_part = "someone"
- display_name = "someone"
- user_id = "@someone:test"
- requester = create_requester("@as:test")
+ frank = UserID.from_string("@frank:test")
+ user_id = frank.to_string()
+ requester = create_requester(user_id)
result_user_id, result_token = yield self.handler.get_or_create_user(
- requester, local_part, display_name
+ requester, frank.localpart, "Frankie"
)
self.assertEquals(result_user_id, user_id)
self.assertEquals(result_token, 'secret')
@@ -78,12 +75,11 @@ class RegistrationTestCase(unittest.TestCase):
token="jkv;g498752-43gj['eamb!-5",
password_hash=None,
)
- local_part = "frank"
- display_name = "Frank"
- user_id = "@frank:test"
- requester = create_requester("@as:test")
+ local_part = frank.localpart
+ user_id = frank.to_string()
+ requester = create_requester(user_id)
result_user_id, result_token = yield self.handler.get_or_create_user(
- requester, local_part, display_name
+ requester, local_part, None
)
self.assertEquals(result_user_id, user_id)
self.assertEquals(result_token, 'secret')
@@ -92,7 +88,7 @@ class RegistrationTestCase(unittest.TestCase):
def test_mau_limits_when_disabled(self):
self.hs.config.limit_usage_by_mau = False
# Ensure does not throw exception
- yield self.handler.get_or_create_user("requester", 'a', "display_name")
+ yield self.handler.get_or_create_user(self.requester, 'a', "display_name")
@defer.inlineCallbacks
def test_get_or_create_user_mau_not_blocked(self):
@@ -101,7 +97,7 @@ class RegistrationTestCase(unittest.TestCase):
return_value=defer.succeed(self.hs.config.max_mau_value - 1)
)
# Ensure does not throw exception
- yield self.handler.get_or_create_user("@user:server", 'c', "User")
+ yield self.handler.get_or_create_user(self.requester, 'c', "User")
@defer.inlineCallbacks
def test_get_or_create_user_mau_blocked(self):
@@ -110,13 +106,13 @@ class RegistrationTestCase(unittest.TestCase):
return_value=defer.succeed(self.lots_of_users)
)
with self.assertRaises(ResourceLimitError):
- yield self.handler.get_or_create_user("requester", 'b', "display_name")
+ yield self.handler.get_or_create_user(self.requester, 'b', "display_name")
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.hs.config.max_mau_value)
)
with self.assertRaises(ResourceLimitError):
- yield self.handler.get_or_create_user("requester", 'b', "display_name")
+ yield self.handler.get_or_create_user(self.requester, 'b', "display_name")
@defer.inlineCallbacks
def test_register_mau_blocked(self):
@@ -147,3 +143,44 @@ class RegistrationTestCase(unittest.TestCase):
)
with self.assertRaises(ResourceLimitError):
yield self.handler.register_saml2(localpart="local_part")
+
+ @defer.inlineCallbacks
+ def test_auto_create_auto_join_rooms(self):
+ room_alias_str = "#room:test"
+ self.hs.config.auto_join_rooms = [room_alias_str]
+ res = yield self.handler.register(localpart='jeff')
+ rooms = yield self.store.get_rooms_for_user(res[0])
+
+ directory_handler = self.hs.get_handlers().directory_handler
+ room_alias = RoomAlias.from_string(room_alias_str)
+ room_id = yield directory_handler.get_association(room_alias)
+
+ self.assertTrue(room_id['room_id'] in rooms)
+ self.assertEqual(len(rooms), 1)
+
+ @defer.inlineCallbacks
+ def test_auto_create_auto_join_rooms_with_no_rooms(self):
+ self.hs.config.auto_join_rooms = []
+ frank = UserID.from_string("@frank:test")
+ res = yield self.handler.register(frank.localpart)
+ self.assertEqual(res[0], frank.to_string())
+ rooms = yield self.store.get_rooms_for_user(res[0])
+ self.assertEqual(len(rooms), 0)
+
+ @defer.inlineCallbacks
+ def test_auto_create_auto_join_where_room_is_another_domain(self):
+ self.hs.config.auto_join_rooms = ["#room:another"]
+ frank = UserID.from_string("@frank:test")
+ res = yield self.handler.register(frank.localpart)
+ self.assertEqual(res[0], frank.to_string())
+ rooms = yield self.store.get_rooms_for_user(res[0])
+ self.assertEqual(len(rooms), 0)
+
+ @defer.inlineCallbacks
+ def test_auto_create_auto_join_where_auto_create_is_false(self):
+ self.hs.config.autocreate_auto_join_rooms = False
+ room_alias_str = "#room:test"
+ self.hs.config.auto_join_rooms = [room_alias_str]
+ res = yield self.handler.register(localpart='jeff')
+ rooms = yield self.store.get_rooms_for_user(res[0])
+ self.assertEqual(len(rooms), 0)
diff --git a/tests/state/__init__.py b/tests/state/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/state/__init__.py
diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py
new file mode 100644
index 0000000000..efd85ebe6c
--- /dev/null
+++ b/tests/state/test_v2.py
@@ -0,0 +1,663 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+
+from six.moves import zip
+
+import attr
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.event_auth import auth_types_for_event
+from synapse.events import FrozenEvent
+from synapse.state.v2 import lexicographical_topological_sort, resolve_events_with_store
+from synapse.types import EventID
+
+from tests import unittest
+
+ALICE = "@alice:example.com"
+BOB = "@bob:example.com"
+CHARLIE = "@charlie:example.com"
+EVELYN = "@evelyn:example.com"
+ZARA = "@zara:example.com"
+
+ROOM_ID = "!test:example.com"
+
+MEMBERSHIP_CONTENT_JOIN = {"membership": Membership.JOIN}
+MEMBERSHIP_CONTENT_BAN = {"membership": Membership.BAN}
+
+
+ORIGIN_SERVER_TS = 0
+
+
+class FakeEvent(object):
+ """A fake event we use as a convenience.
+
+ NOTE: Again as a convenience we use "node_ids" rather than event_ids to
+ refer to events. The event_id has node_id as localpart and example.com
+ as domain.
+ """
+ def __init__(self, id, sender, type, state_key, content):
+ self.node_id = id
+ self.event_id = EventID(id, "example.com").to_string()
+ self.sender = sender
+ self.type = type
+ self.state_key = state_key
+ self.content = content
+
+ def to_event(self, auth_events, prev_events):
+ """Given the auth_events and prev_events, convert to a Frozen Event
+
+ Args:
+ auth_events (list[str]): list of event_ids
+ prev_events (list[str]): list of event_ids
+
+ Returns:
+ FrozenEvent
+ """
+ global ORIGIN_SERVER_TS
+
+ ts = ORIGIN_SERVER_TS
+ ORIGIN_SERVER_TS = ORIGIN_SERVER_TS + 1
+
+ event_dict = {
+ "auth_events": [(a, {}) for a in auth_events],
+ "prev_events": [(p, {}) for p in prev_events],
+ "event_id": self.node_id,
+ "sender": self.sender,
+ "type": self.type,
+ "content": self.content,
+ "origin_server_ts": ts,
+ "room_id": ROOM_ID,
+ }
+
+ if self.state_key is not None:
+ event_dict["state_key"] = self.state_key
+
+ return FrozenEvent(event_dict)
+
+
+# All graphs start with this set of events
+INITIAL_EVENTS = [
+ FakeEvent(
+ id="CREATE",
+ sender=ALICE,
+ type=EventTypes.Create,
+ state_key="",
+ content={"creator": ALICE},
+ ),
+ FakeEvent(
+ id="IMA",
+ sender=ALICE,
+ type=EventTypes.Member,
+ state_key=ALICE,
+ content=MEMBERSHIP_CONTENT_JOIN,
+ ),
+ FakeEvent(
+ id="IPOWER",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key="",
+ content={"users": {ALICE: 100}},
+ ),
+ FakeEvent(
+ id="IJR",
+ sender=ALICE,
+ type=EventTypes.JoinRules,
+ state_key="",
+ content={"join_rule": JoinRules.PUBLIC},
+ ),
+ FakeEvent(
+ id="IMB",
+ sender=BOB,
+ type=EventTypes.Member,
+ state_key=BOB,
+ content=MEMBERSHIP_CONTENT_JOIN,
+ ),
+ FakeEvent(
+ id="IMC",
+ sender=CHARLIE,
+ type=EventTypes.Member,
+ state_key=CHARLIE,
+ content=MEMBERSHIP_CONTENT_JOIN,
+ ),
+ FakeEvent(
+ id="IMZ",
+ sender=ZARA,
+ type=EventTypes.Member,
+ state_key=ZARA,
+ content=MEMBERSHIP_CONTENT_JOIN,
+ ),
+ FakeEvent(
+ id="START",
+ sender=ZARA,
+ type=EventTypes.Message,
+ state_key=None,
+ content={},
+ ),
+ FakeEvent(
+ id="END",
+ sender=ZARA,
+ type=EventTypes.Message,
+ state_key=None,
+ content={},
+ ),
+]
+
+INITIAL_EDGES = [
+ "START", "IMZ", "IMC", "IMB", "IJR", "IPOWER", "IMA", "CREATE",
+]
+
+
+class StateTestCase(unittest.TestCase):
+ def test_ban_vs_pl(self):
+ events = [
+ FakeEvent(
+ id="PA",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key="",
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ }
+ },
+ ),
+ FakeEvent(
+ id="MA",
+ sender=ALICE,
+ type=EventTypes.Member,
+ state_key=ALICE,
+ content={"membership": Membership.JOIN},
+ ),
+ FakeEvent(
+ id="MB",
+ sender=ALICE,
+ type=EventTypes.Member,
+ state_key=BOB,
+ content={"membership": Membership.BAN},
+ ),
+ FakeEvent(
+ id="PB",
+ sender=BOB,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ },
+ },
+ ),
+ ]
+
+ edges = [
+ ["END", "MB", "MA", "PA", "START"],
+ ["END", "PB", "PA"],
+ ]
+
+ expected_state_ids = ["PA", "MA", "MB"]
+
+ self.do_check(events, edges, expected_state_ids)
+
+ def test_join_rule_evasion(self):
+ events = [
+ FakeEvent(
+ id="JR",
+ sender=ALICE,
+ type=EventTypes.JoinRules,
+ state_key="",
+ content={"join_rules": JoinRules.PRIVATE},
+ ),
+ FakeEvent(
+ id="ME",
+ sender=EVELYN,
+ type=EventTypes.Member,
+ state_key=EVELYN,
+ content={"membership": Membership.JOIN},
+ ),
+ ]
+
+ edges = [
+ ["END", "JR", "START"],
+ ["END", "ME", "START"],
+ ]
+
+ expected_state_ids = ["JR"]
+
+ self.do_check(events, edges, expected_state_ids)
+
+ def test_offtopic_pl(self):
+ events = [
+ FakeEvent(
+ id="PA",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key="",
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ }
+ },
+ ),
+ FakeEvent(
+ id="PB",
+ sender=BOB,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ CHARLIE: 50,
+ },
+ },
+ ),
+ FakeEvent(
+ id="PC",
+ sender=CHARLIE,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ CHARLIE: 0,
+ },
+ },
+ ),
+ ]
+
+ edges = [
+ ["END", "PC", "PB", "PA", "START"],
+ ["END", "PA"],
+ ]
+
+ expected_state_ids = ["PC"]
+
+ self.do_check(events, edges, expected_state_ids)
+
+ def test_topic_basic(self):
+ events = [
+ FakeEvent(
+ id="T1",
+ sender=ALICE,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ FakeEvent(
+ id="PA1",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ },
+ },
+ ),
+ FakeEvent(
+ id="T2",
+ sender=ALICE,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ FakeEvent(
+ id="PA2",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 0,
+ },
+ },
+ ),
+ FakeEvent(
+ id="PB",
+ sender=BOB,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ },
+ },
+ ),
+ FakeEvent(
+ id="T3",
+ sender=BOB,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ ]
+
+ edges = [
+ ["END", "PA2", "T2", "PA1", "T1", "START"],
+ ["END", "T3", "PB", "PA1"],
+ ]
+
+ expected_state_ids = ["PA2", "T2"]
+
+ self.do_check(events, edges, expected_state_ids)
+
+ def test_topic_reset(self):
+ events = [
+ FakeEvent(
+ id="T1",
+ sender=ALICE,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ FakeEvent(
+ id="PA",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ },
+ },
+ ),
+ FakeEvent(
+ id="T2",
+ sender=BOB,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ FakeEvent(
+ id="MB",
+ sender=ALICE,
+ type=EventTypes.Member,
+ state_key=BOB,
+ content={"membership": Membership.BAN},
+ ),
+ ]
+
+ edges = [
+ ["END", "MB", "T2", "PA", "T1", "START"],
+ ["END", "T1"],
+ ]
+
+ expected_state_ids = ["T1", "MB", "PA"]
+
+ self.do_check(events, edges, expected_state_ids)
+
+ def test_topic(self):
+ events = [
+ FakeEvent(
+ id="T1",
+ sender=ALICE,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ FakeEvent(
+ id="PA1",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ },
+ },
+ ),
+ FakeEvent(
+ id="T2",
+ sender=ALICE,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ FakeEvent(
+ id="PA2",
+ sender=ALICE,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 0,
+ },
+ },
+ ),
+ FakeEvent(
+ id="PB",
+ sender=BOB,
+ type=EventTypes.PowerLevels,
+ state_key='',
+ content={
+ "users": {
+ ALICE: 100,
+ BOB: 50,
+ },
+ },
+ ),
+ FakeEvent(
+ id="T3",
+ sender=BOB,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ FakeEvent(
+ id="MZ1",
+ sender=ZARA,
+ type=EventTypes.Message,
+ state_key=None,
+ content={},
+ ),
+ FakeEvent(
+ id="T4",
+ sender=ALICE,
+ type=EventTypes.Topic,
+ state_key="",
+ content={},
+ ),
+ ]
+
+ edges = [
+ ["END", "T4", "MZ1", "PA2", "T2", "PA1", "T1", "START"],
+ ["END", "MZ1", "T3", "PB", "PA1"],
+ ]
+
+ expected_state_ids = ["T4", "PA2"]
+
+ self.do_check(events, edges, expected_state_ids)
+
+ def do_check(self, events, edges, expected_state_ids):
+ """Take a list of events and edges and calculate the state of the
+ graph at END, and asserts it matches `expected_state_ids`
+
+ Args:
+ events (list[FakeEvent])
+ edges (list[list[str]]): A list of chains of event edges, e.g.
+ `[[A, B, C]]` are edges A->B and B->C.
+ expected_state_ids (list[str]): The expected state at END, (excluding
+ the keys that haven't changed since START).
+ """
+ # We want to sort the events into topological order for processing.
+ graph = {}
+
+ # node_id -> FakeEvent
+ fake_event_map = {}
+
+ for ev in itertools.chain(INITIAL_EVENTS, events):
+ graph[ev.node_id] = set()
+ fake_event_map[ev.node_id] = ev
+
+ for a, b in pairwise(INITIAL_EDGES):
+ graph[a].add(b)
+
+ for edge_list in edges:
+ for a, b in pairwise(edge_list):
+ graph[a].add(b)
+
+ # event_id -> FrozenEvent
+ event_map = {}
+ # node_id -> state
+ state_at_event = {}
+
+ # We copy the map as the sort consumes the graph
+ graph_copy = {k: set(v) for k, v in graph.items()}
+
+ for node_id in lexicographical_topological_sort(graph_copy, key=lambda e: e):
+ fake_event = fake_event_map[node_id]
+ event_id = fake_event.event_id
+
+ prev_events = list(graph[node_id])
+
+ if len(prev_events) == 0:
+ state_before = {}
+ elif len(prev_events) == 1:
+ state_before = dict(state_at_event[prev_events[0]])
+ else:
+ state_d = resolve_events_with_store(
+ [state_at_event[n] for n in prev_events],
+ event_map=event_map,
+ state_res_store=TestStateResolutionStore(event_map),
+ )
+
+ self.assertTrue(state_d.called)
+ state_before = state_d.result
+
+ state_after = dict(state_before)
+ if fake_event.state_key is not None:
+ state_after[(fake_event.type, fake_event.state_key)] = event_id
+
+ auth_types = set(auth_types_for_event(fake_event))
+
+ auth_events = []
+ for key in auth_types:
+ if key in state_before:
+ auth_events.append(state_before[key])
+
+ event = fake_event.to_event(auth_events, prev_events)
+
+ state_at_event[node_id] = state_after
+ event_map[event_id] = event
+
+ expected_state = {}
+ for node_id in expected_state_ids:
+ # expected_state_ids are node IDs rather than event IDs,
+ # so we have to convert
+ event_id = EventID(node_id, "example.com").to_string()
+ event = event_map[event_id]
+
+ key = (event.type, event.state_key)
+
+ expected_state[key] = event_id
+
+ start_state = state_at_event["START"]
+ end_state = {
+ key: value
+ for key, value in state_at_event["END"].items()
+ if key in expected_state or start_state.get(key) != value
+ }
+
+ self.assertEqual(expected_state, end_state)
+
+
+class LexicographicalTestCase(unittest.TestCase):
+ def test_simple(self):
+ graph = {
+ "l": {"o"},
+ "m": {"n", "o"},
+ "n": {"o"},
+ "o": set(),
+ "p": {"o"},
+ }
+
+ res = list(lexicographical_topological_sort(graph, key=lambda x: x))
+
+ self.assertEqual(["o", "l", "n", "m", "p"], res)
+
+
+def pairwise(iterable):
+ "s -> (s0,s1), (s1,s2), (s2, s3), ..."
+ a, b = itertools.tee(iterable)
+ next(b, None)
+ return zip(a, b)
+
+
+@attr.s
+class TestStateResolutionStore(object):
+ event_map = attr.ib()
+
+ def get_events(self, event_ids, allow_rejected=False):
+ """Get events from the database
+
+ Args:
+ event_ids (list): The event_ids of the events to fetch
+ allow_rejected (bool): If True return rejected events.
+
+ Returns:
+ Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
+ """
+
+ return {
+ eid: self.event_map[eid]
+ for eid in event_ids
+ if eid in self.event_map
+ }
+
+ def get_auth_chain(self, event_ids):
+ """Gets the full auth chain for a set of events (including rejected
+ events).
+
+ Includes the given event IDs in the result.
+
+ Note that:
+ 1. All events must be state events.
+ 2. For v1 rooms this may not have the full auth chain in the
+ presence of rejected events
+
+ Args:
+ event_ids (list): The event IDs of the events to fetch the auth
+ chain for. Must be state events.
+
+ Returns:
+ Deferred[list[str]]: List of event IDs of the auth chain.
+ """
+
+ # Simple DFS for auth chain
+ result = set()
+ stack = list(event_ids)
+ while stack:
+ event_id = stack.pop()
+ if event_id in result:
+ continue
+
+ result.add(event_id)
+
+ event = self.event_map[event_id]
+ for aid, _ in event.auth_events:
+ stack.append(aid)
+
+ return list(result)
diff --git a/tests/utils.py b/tests/utils.py
index dd347a0c59..565bb60d08 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -124,6 +124,7 @@ def default_config(name):
config.user_consent_server_notice_content = None
config.block_events_without_consent_error = None
config.media_storage_providers = []
+ config.autocreate_auto_join_rooms = True
config.auto_join_rooms = []
config.limit_usage_by_mau = False
config.hs_disabled = False
diff --git a/tox.ini b/tox.ini
index 87b5e4782d..9de5a5704a 100644
--- a/tox.ini
+++ b/tox.ini
@@ -3,7 +3,6 @@ envlist = packaging, py27, py36, pep8, check_isort
[base]
deps =
- coverage
Twisted>=17.1
mock
python-subunit
@@ -26,9 +25,7 @@ passenv = *
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
- coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
- "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
- {env:DUMP_COVERAGE_COMMAND:coverage report -m}
+ "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
[testenv:py27]
@@ -108,10 +105,10 @@ commands =
[testenv:pep8]
skip_install = True
-basepython = python2.7
+basepython = python3.6
deps =
flake8
-commands = /bin/sh -c "flake8 synapse tests {env:PEP8SUFFIX:}"
+commands = /bin/sh -c "flake8 synapse tests scripts scripts-dev scripts/register_new_matrix_user scripts/synapse_port_db synctl {env:PEP8SUFFIX:}"
[testenv:check_isort]
skip_install = True
|