diff --git a/synapse/__init__.py b/synapse/__init__.py
index 9dbe0b9f10..b1f7a89fba 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
except ImportError:
pass
-__version__ = "0.33.4"
+__version__ = "0.33.5.1"
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index a31a9a17e0..eed8c67e6a 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -226,7 +226,7 @@ class Filtering(object):
jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
format_checker=FormatChecker())
except jsonschema.ValidationError as e:
- raise SynapseError(400, e.message)
+ raise SynapseError(400, str(e))
class FilterCollection(object):
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 3b6b9368b8..c3afcc573b 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -24,7 +24,7 @@ try:
python_dependencies.check_requirements()
except python_dependencies.MissingRequirementError as e:
message = "\n".join([
- "Missing Requirement: %s" % (e.message,),
+ "Missing Requirement: %s" % (str(e),),
"To install run:",
" pip install --upgrade --force \"%s\"" % (e.dependency,),
"",
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 86b5067400..8559e141af 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -136,7 +136,7 @@ def start(config_options):
"Synapse appservice", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.appservice"
@@ -172,7 +172,6 @@ def start(config_options):
def start():
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ce2b113dbb..76aed8c60a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -153,7 +153,7 @@ def start(config_options):
"Synapse client reader", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.client_reader"
@@ -181,7 +181,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index f98e456ea0..9060ab14f6 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -169,7 +169,7 @@ def start(config_options):
"Synapse event creator", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.event_creator"
@@ -199,7 +199,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 60f5973505..228a297fb8 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -140,7 +140,7 @@ def start(config_options):
"Synapse federation reader", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_reader"
@@ -168,7 +168,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 60dd09aac3..e9a99d76e1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -160,7 +160,7 @@ def start(config_options):
"Synapse federation sender", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_sender"
@@ -201,7 +201,6 @@ def start(config_options):
def start():
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-federation-sender", config)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 8c0b9c67b0..fc4b25de1c 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -228,7 +228,7 @@ def start(config_options):
"Synapse frontend proxy", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.frontend_proxy"
@@ -258,7 +258,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 3241ded188..a98fdbd210 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
- quit_with_error(e.message)
+ quit_with_error(str(e))
# Gauges to expose monthly active user control metrics
@@ -328,7 +328,7 @@ def setup(config_options):
config_options,
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
if not config:
@@ -384,7 +384,6 @@ def setup(config_options):
def start():
hs.get_pusherpool().start()
- hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
hs.get_federation_client().start_get_pdu_cache()
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index e3dbb3b4e6..acc0487adc 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -133,7 +133,7 @@ def start(config_options):
"Synapse media repository", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.media_repository"
@@ -168,7 +168,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 244c604de9..630dcda478 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -191,7 +191,7 @@ def start(config_options):
"Synapse pusher", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.pusher"
@@ -228,7 +228,6 @@ def start(config_options):
def start():
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6662340797..9a7fc6ee9d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -410,7 +410,7 @@ def start(config_options):
"Synapse synchrotron", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.synchrotron"
@@ -435,7 +435,6 @@ def start(config_options):
def start():
ss.get_datastore().start_profiling()
- ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
deleted file mode 100755
index d658f967ba..0000000000
--- a/synapse/app/synctl.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import argparse
-import collections
-import errno
-import glob
-import os
-import os.path
-import signal
-import subprocess
-import sys
-import time
-
-from six import iteritems
-
-import yaml
-
-SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
-
-GREEN = "\x1b[1;32m"
-YELLOW = "\x1b[1;33m"
-RED = "\x1b[1;31m"
-NORMAL = "\x1b[m"
-
-
-def pid_running(pid):
- try:
- os.kill(pid, 0)
- return True
- except OSError as err:
- if err.errno == errno.EPERM:
- return True
- return False
-
-
-def write(message, colour=NORMAL, stream=sys.stdout):
- if colour == NORMAL:
- stream.write(message + "\n")
- else:
- stream.write(colour + message + NORMAL + "\n")
-
-
-def abort(message, colour=RED, stream=sys.stderr):
- write(message, colour, stream)
- sys.exit(1)
-
-
-def start(configfile):
- write("Starting ...")
- args = SYNAPSE
- args.extend(["--daemonize", "-c", configfile])
-
- try:
- subprocess.check_call(args)
- write("started synapse.app.homeserver(%r)" %
- (configfile,), colour=GREEN)
- except subprocess.CalledProcessError as e:
- write(
- "error starting (exit code: %d); see above for logs" % e.returncode,
- colour=RED,
- )
-
-
-def start_worker(app, configfile, worker_configfile):
- args = [
- "python", "-B",
- "-m", app,
- "-c", configfile,
- "-c", worker_configfile
- ]
-
- try:
- subprocess.check_call(args)
- write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
- except subprocess.CalledProcessError as e:
- write(
- "error starting %s(%r) (exit code: %d); see above for logs" % (
- app, worker_configfile, e.returncode,
- ),
- colour=RED,
- )
-
-
-def stop(pidfile, app):
- if os.path.exists(pidfile):
- pid = int(open(pidfile).read())
- try:
- os.kill(pid, signal.SIGTERM)
- write("stopped %s" % (app,), colour=GREEN)
- except OSError as err:
- if err.errno == errno.ESRCH:
- write("%s not running" % (app,), colour=YELLOW)
- elif err.errno == errno.EPERM:
- abort("Cannot stop %s: Operation not permitted" % (app,))
- else:
- abort("Cannot stop %s: Unknown error" % (app,))
-
-
-Worker = collections.namedtuple("Worker", [
- "app", "configfile", "pidfile", "cache_factor"
-])
-
-
-def main():
-
- parser = argparse.ArgumentParser()
-
- parser.add_argument(
- "action",
- choices=["start", "stop", "restart"],
- help="whether to start, stop or restart the synapse",
- )
- parser.add_argument(
- "configfile",
- nargs="?",
- default="homeserver.yaml",
- help="the homeserver config file, defaults to homeserver.yaml",
- )
- parser.add_argument(
- "-w", "--worker",
- metavar="WORKERCONFIG",
- help="start or stop a single worker",
- )
- parser.add_argument(
- "-a", "--all-processes",
- metavar="WORKERCONFIGDIR",
- help="start or stop all the workers in the given directory"
- " and the main synapse process",
- )
-
- options = parser.parse_args()
-
- if options.worker and options.all_processes:
- write(
- 'Cannot use "--worker" with "--all-processes"',
- stream=sys.stderr
- )
- sys.exit(1)
-
- configfile = options.configfile
-
- if not os.path.exists(configfile):
- write(
- "No config file found\n"
- "To generate a config file, run '%s -c %s --generate-config"
- " --server-name=<server name>'\n" % (
- " ".join(SYNAPSE), options.configfile
- ),
- stream=sys.stderr,
- )
- sys.exit(1)
-
- with open(configfile) as stream:
- config = yaml.load(stream)
-
- pidfile = config["pid_file"]
- cache_factor = config.get("synctl_cache_factor")
- start_stop_synapse = True
-
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
-
- cache_factors = config.get("synctl_cache_factors", {})
- for cache_name, factor in iteritems(cache_factors):
- os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
-
- worker_configfiles = []
- if options.worker:
- start_stop_synapse = False
- worker_configfile = options.worker
- if not os.path.exists(worker_configfile):
- write(
- "No worker config found at %r" % (worker_configfile,),
- stream=sys.stderr,
- )
- sys.exit(1)
- worker_configfiles.append(worker_configfile)
-
- if options.all_processes:
- # To start the main synapse with -a you need to add a worker file
- # with worker_app == "synapse.app.homeserver"
- start_stop_synapse = False
- worker_configdir = options.all_processes
- if not os.path.isdir(worker_configdir):
- write(
- "No worker config directory found at %r" % (worker_configdir,),
- stream=sys.stderr,
- )
- sys.exit(1)
- worker_configfiles.extend(sorted(glob.glob(
- os.path.join(worker_configdir, "*.yaml")
- )))
-
- workers = []
- for worker_configfile in worker_configfiles:
- with open(worker_configfile) as stream:
- worker_config = yaml.load(stream)
- worker_app = worker_config["worker_app"]
- if worker_app == "synapse.app.homeserver":
- # We need to special case all of this to pick up options that may
- # be set in the main config file or in this worker config file.
- worker_pidfile = (
- worker_config.get("pid_file")
- or pidfile
- )
- worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
- daemonize = worker_config.get("daemonize") or config.get("daemonize")
- assert daemonize, "Main process must have daemonize set to true"
-
- # The master process doesn't support using worker_* config.
- for key in worker_config:
- if key == "worker_app": # But we allow worker_app
- continue
- assert not key.startswith("worker_"), \
- "Main process cannot use worker_* config"
- else:
- worker_pidfile = worker_config["worker_pid_file"]
- worker_daemonize = worker_config["worker_daemonize"]
- assert worker_daemonize, "In config %r: expected '%s' to be True" % (
- worker_configfile, "worker_daemonize")
- worker_cache_factor = worker_config.get("synctl_cache_factor")
- workers.append(Worker(
- worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
- ))
-
- action = options.action
-
- if action == "stop" or action == "restart":
- for worker in workers:
- stop(worker.pidfile, worker.app)
-
- if start_stop_synapse:
- stop(pidfile, "synapse.app.homeserver")
-
- # Wait for synapse to actually shutdown before starting it again
- if action == "restart":
- running_pids = []
- if start_stop_synapse and os.path.exists(pidfile):
- running_pids.append(int(open(pidfile).read()))
- for worker in workers:
- if os.path.exists(worker.pidfile):
- running_pids.append(int(open(worker.pidfile).read()))
- if len(running_pids) > 0:
- write("Waiting for process to exit before restarting...")
- for running_pid in running_pids:
- while pid_running(running_pid):
- time.sleep(0.2)
- write("All processes exited; now restarting...")
-
- if action == "start" or action == "restart":
- if start_stop_synapse:
- # Check if synapse is already running
- if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
- abort("synapse.app.homeserver already running")
- start(configfile)
-
- for worker in workers:
- if worker.cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
-
- start_worker(worker.app, configfile, worker.configfile)
-
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
- else:
- os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
-
-
-if __name__ == "__main__":
- main()
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 96ffcaf073..0a5f62b509 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -188,7 +188,7 @@ def start(config_options):
"Synapse user directory", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.user_dir"
@@ -229,7 +229,6 @@ def start(config_options):
def start():
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 58c97a70af..8fccf573ee 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -25,7 +25,7 @@ if __name__ == "__main__":
try:
config = HomeServerConfig.load_config("", sys.argv[3:])
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
print (getattr(config, key))
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index fe67b2ff42..d05ed91d64 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -66,6 +66,14 @@ class FederationClient(FederationBase):
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
+ self._get_pdu_cache = ExpiringCache(
+ cache_name="get_pdu_cache",
+ clock=self._clock,
+ max_len=1000,
+ expiry_ms=120 * 1000,
+ reset_expiry_on_get=False,
+ )
+
def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
@@ -82,17 +90,6 @@ class FederationClient(FederationBase):
if destination_dict:
self.pdu_destination_tried[event_id] = destination_dict
- def start_get_pdu_cache(self):
- self._get_pdu_cache = ExpiringCache(
- cache_name="get_pdu_cache",
- clock=self._clock,
- max_len=1000,
- expiry_ms=120 * 1000,
- reset_expiry_on_get=False,
- )
-
- self._get_pdu_cache.start()
-
@log_function
def make_query(self, destination, query_type, args,
retry_on_dns_fail=False, ignore_backoff=False):
@@ -212,8 +209,6 @@ class FederationClient(FederationBase):
Will attempt to get the PDU from each destination in the list until
one succeeds.
- This will persist the PDU locally upon receipt.
-
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
@@ -229,10 +224,9 @@ class FederationClient(FederationBase):
# TODO: Rate limit the number of times we try and get the same event.
- if self._get_pdu_cache:
- ev = self._get_pdu_cache.get(event_id)
- if ev:
- defer.returnValue(ev)
+ ev = self._get_pdu_cache.get(event_id)
+ if ev:
+ defer.returnValue(ev)
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
@@ -285,7 +279,7 @@ class FederationClient(FederationBase):
)
continue
- if self._get_pdu_cache is not None and signed_pdu:
+ if signed_pdu:
self._get_pdu_cache[event_id] = signed_pdu
defer.returnValue(signed_pdu)
@@ -293,8 +287,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
- """Requests all of the `current` state PDUs for a given room from
- a remote home server.
+ """Requests all of the room state at a given event from a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
@@ -302,9 +295,10 @@ class FederationClient(FederationBase):
event_id (str): The id of the event we want the state at.
Returns:
- Deferred: Results in a list of PDUs.
+ Deferred[Tuple[List[EventBase], List[EventBase]]]:
+ A list of events in the state, and a list of events in the auth chain
+ for the given event.
"""
-
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index dbee404ea7..819e8f7331 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- try:
- yield self._handle_received_pdu(
- origin, pdu
- )
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warn("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s: %s",
- event_id, f.getTraceback().rstrip(),
- )
+ with nested_logging_context(event_id):
+ try:
+ yield self._handle_received_pdu(
+ origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s: %s",
+ event_id, f.getTraceback().rstrip(),
+ )
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@@ -618,7 +620,7 @@ class FederationServer(FederationBase):
)
yield self.handler.on_receive_pdu(
- origin, pdu, get_missing=True, sent_to_us_directly=True,
+ origin, pdu, sent_to_us_directly=True,
)
def __str__(self):
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 578e9250fb..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -341,7 +341,7 @@ class E2eKeysHandler(object):
def _exception_to_failure(e):
if isinstance(e, CodeMessageException):
return {
- "status": e.code, "message": e.message,
+ "status": e.code, "message": str(e),
}
if isinstance(e, NotRetryingDestination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f10b46414b..38bebbf598 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,6 +69,27 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+def shortstr(iterable, maxitems=5):
+ """If iterable has maxitems or fewer, return the stringification of a list
+ containing those items.
+
+ Otherwise, return the stringification of a a list with the first maxitems items,
+ followed by "...".
+
+ Args:
+ iterable (Iterable): iterable to truncate
+ maxitems (int): number of items to return before truncating
+
+ Returns:
+ unicode
+ """
+
+ items = list(itertools.islice(iterable, maxitems + 1))
+ if len(items) <= maxitems:
+ return str(items)
+ return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -114,9 +135,8 @@ class FederationHandler(BaseHandler):
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@defer.inlineCallbacks
- @log_function
def on_receive_pdu(
- self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ self, origin, pdu, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -125,14 +145,23 @@ class FederationHandler(BaseHandler):
origin (str): server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu (FrozenEvent): received PDU
- get_missing (bool): True if we should fetch missing prev_events
+ sent_to_us_directly (bool): True if this event was pushed to us; False if
+ we pulled it as the result of a missing prev_event.
Returns (Deferred): completes with None
"""
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
+ logger.info(
+ "[%s %s] handling received PDU: %s",
+ room_id, event_id, pdu,
+ )
+
# We reprocess pdus when we have seen them only as outliers
existing = yield self.store.get_event(
- pdu.event_id,
+ event_id,
allow_none=True,
allow_rejected=True,
)
@@ -147,7 +176,7 @@ class FederationHandler(BaseHandler):
)
)
if already_seen:
- logger.debug("Already seen pdu %s", pdu.event_id)
+ logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
return
# do some initial sanity-checking of the event. In particular, make
@@ -156,6 +185,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
+ logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
raise FederationError(
"ERROR",
err.code,
@@ -165,10 +195,12 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
- if pdu.room_id in self.room_queues:
- logger.info("Ignoring PDU %s for room %s from %s for now; join "
- "in progress", pdu.event_id, pdu.room_id, origin)
- self.room_queues[pdu.room_id].append((pdu, origin))
+ if room_id in self.room_queues:
+ logger.info(
+ "[%s %s] Queuing PDU from %s for now: join in progress",
+ room_id, event_id, origin,
+ )
+ self.room_queues[room_id].append((pdu, origin))
return
# If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +211,7 @@ class FederationHandler(BaseHandler):
# we should check if we *are* in fact in the room. If we are then we
# can magically rejoin the room.
is_in_room = yield self.auth.check_host_in_room(
- pdu.room_id,
+ room_id,
self.server_name
)
if not is_in_room:
@@ -188,8 +220,8 @@ class FederationHandler(BaseHandler):
)
if was_in_room:
logger.info(
- "Ignoring PDU %s for room %s from %s as we've left the room!",
- pdu.event_id, pdu.room_id, origin,
+ "[%s %s] Ignoring PDU from %s as we've left the room",
+ room_id, event_id, origin,
)
defer.returnValue(None)
@@ -204,8 +236,8 @@ class FederationHandler(BaseHandler):
)
logger.debug(
- "_handle_new_pdu min_depth for %s: %d",
- pdu.room_id, min_depth
+ "[%s %s] min_depth: %d",
+ room_id, event_id, min_depth,
)
prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +250,18 @@ class FederationHandler(BaseHandler):
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
- if get_missing and prevs - seen:
+ missing_prevs = prevs - seen
+ if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
- "Acquiring lock for room %r to fetch %d missing events: %r...",
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
- "Acquired lock for room %r to fetch %d missing events",
- pdu.room_id, len(prevs - seen),
+ "[%s %s] Acquired room lock to fetch %d missing prev_events",
+ room_id, event_id, len(missing_prevs),
)
yield self._get_missing_events_for_pdu(
@@ -241,49 +274,91 @@ class FederationHandler(BaseHandler):
if not prevs - seen:
logger.info(
- "Found all missing prev events for %s", pdu.event_id
+ "[%s %s] Found all missing prev_events",
+ room_id, event_id,
)
- elif prevs - seen:
+ elif missing_prevs:
logger.info(
- "Not fetching %d missing events for room %r,event %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id,
- list(prevs - seen)[:5],
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
+ )
+
+ if prevs - seen:
+ # We've still not been able to get all of the prev_events for this event.
+ #
+ # In this case, we need to fall back to asking another server in the
+ # federation for the state at this event. That's ok provided we then
+ # resolve the state against other bits of the DAG before using it (which
+ # will ensure that you can't just take over a room by sending an event,
+ # withholding its prev_events, and declaring yourself to be an admin in
+ # the subsequent state request).
+ #
+ # Now, if we're pulling this event as a missing prev_event, then clearly
+ # this event is not going to become the only forward-extremity and we are
+ # guaranteed to resolve its state against our existing forward
+ # extremities, so that should be fine.
+ #
+ # On the other hand, if this event was pushed to us, it is possible for
+ # it to become the only forward-extremity in the room, and we would then
+ # trust its state to be the state for the whole room. This is very bad.
+ # Further, if the event was pushed to us, there is no excuse for us not to
+ # have all the prev_events. We therefore reject any such events.
+ #
+ # XXX this really feels like it could/should be merged with the above,
+ # but there is an interaction with min_depth that I'm not really
+ # following.
+
+ if sent_to_us_directly:
+ logger.warn(
+ "[%s %s] Failed to fetch %d prev events: rejecting",
+ room_id, event_id, len(prevs - seen),
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
)
- if sent_to_us_directly and prevs - seen:
- # If they have sent it to us directly, and the server
- # isn't telling us about the auth events that it's
- # made a message referencing, we explode
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
- elif prevs - seen:
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
auth_chains = set()
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ ours = yield self.store.get_state_groups(room_id, list(seen))
state_groups.append(ours)
# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
- state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, pdu.room_id, p
- )
+ logger.info(
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id, event_id, p,
)
- auth_chains.update(got_auth_chain)
- state_group = {(x.type, x.state_key): x.event_id for x in state}
- state_groups.append(state_group)
+
+ with logcontext.nested_logging_context(p):
+ # note that if any of the missing prevs share missing state or
+ # auth events, the requests to fetch those events are deduped
+ # by the get_pdu_cache in federation_client.
+ remote_state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, room_id, p,
+ )
+ )
+
+ # XXX hrm I'm not convinced that duplicate events will compare
+ # for equality, so I'm not sure this does what the author
+ # hoped.
+ auth_chains.update(got_auth_chain)
+
+ state_group = {
+ (x.type, x.state_key): x.event_id for x in remote_state
+ }
+ state_groups.append(state_group)
# Resolve any conflicting state
def fetch(ev_ids):
@@ -291,19 +366,24 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
- room_version = yield self.store.get_room_version(pdu.room_id)
+ room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
- room_version, state_groups, {pdu.event_id: pdu}, fetch
+ room_version, state_groups, {event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
auth_chain = list(auth_chains)
except Exception:
+ logger.warn(
+ "[%s %s] Error attempting to resolve state at missing "
+ "prev_events",
+ room_id, event_id, exc_info=True,
+ )
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
- affected=pdu.event_id,
+ affected=event_id,
)
yield self._process_received_pdu(
@@ -322,15 +402,16 @@ class FederationHandler(BaseHandler):
prevs (set(str)): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
"""
- # We recalculate seen, since it may have changed.
+
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
+ latest = yield self.store.get_latest_event_ids_in_room(room_id)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@@ -338,8 +419,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "Missing %d events for room %r pdu %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+ "[%s %s]: Requesting %d prev_events: %s",
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
# XXX: we set timeout to 10s to help workaround
@@ -392,7 +473,7 @@ class FederationHandler(BaseHandler):
missing_events = yield self.federation_client.get_missing_events(
origin,
- pdu.room_id,
+ room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
@@ -401,37 +482,47 @@ class FederationHandler(BaseHandler):
)
logger.info(
- "Got %d events: %r...",
- len(missing_events), [e.event_id for e in missing_events[:5]]
+ "[%s %s]: Got %d prev_events: %s",
+ room_id, event_id, len(missing_events), shortstr(missing_events),
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
- for e in missing_events:
- logger.info("Handling found event %s", e.event_id)
- try:
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
- except FederationError as e:
- if e.code == 403:
- logger.warn("Event %s failed history check.")
- else:
- raise
+ for ev in missing_events:
+ logger.info(
+ "[%s %s] Handling received prev_event %s",
+ room_id, event_id, ev.event_id,
+ )
+ with logcontext.nested_logging_context(ev.event_id):
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ ev,
+ sent_to_us_directly=False,
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
+ else:
+ raise
- @log_function
@defer.inlineCallbacks
- def _process_received_pdu(self, origin, pdu, state, auth_chain):
+ def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
- event = pdu
+ room_id = event.room_id
+ event_id = event.event_id
- logger.debug("Processing event: %s", event)
+ logger.debug(
+ "[%s %s] Processing event: %s",
+ room_id, event_id, event,
+ )
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -440,15 +531,16 @@ class FederationHandler(BaseHandler):
# event.
if state and auth_chain and not event.internal_metadata.is_outlier():
is_in_room = yield self.auth.check_host_in_room(
- event.room_id,
+ room_id,
self.server_name
)
else:
is_in_room = True
+
if not is_in_room:
logger.info(
- "Got event for room we're not in: %r %r",
- event.room_id, event.event_id
+ "[%s %s] Got event for room we're not in",
+ room_id, event_id,
)
try:
@@ -460,7 +552,7 @@ class FederationHandler(BaseHandler):
"ERROR",
e.code,
e.msg,
- affected=event.event_id,
+ affected=event_id,
)
else:
@@ -493,6 +585,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
+ logger.info(
+ "[%s %s] persisting newly-received auth/state events %s",
+ room_id, event_id, [e["event"].event_id for e in event_infos]
+ )
yield self._handle_new_events(origin, event_infos)
try:
@@ -509,12 +605,12 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- room = yield self.store.get_room(event.room_id)
+ room = yield self.store.get_room(room_id)
if not room:
try:
yield self.store.store_room(
- room_id=event.room_id,
+ room_id=room_id,
room_creator_user_id="",
is_public=False,
)
@@ -542,7 +638,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield self.user_joined_room(user, event.room_id)
+ yield self.user_joined_room(user, room_id)
@log_function
@defer.inlineCallbacks
@@ -1056,7 +1152,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
- yield self.on_receive_pdu(origin, p)
+ with logcontext.nested_logging_context(p.event_id):
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@@ -1459,12 +1556,10 @@ class FederationHandler(BaseHandler):
else:
defer.returnValue(None)
- @log_function
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
@defer.inlineCallbacks
- @log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
context = yield self._prep_event(
@@ -1504,15 +1599,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
- contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.run_in_background(
- self._prep_event,
+
+ @defer.inlineCallbacks
+ def prep(ev_info):
+ event = ev_info["event"]
+ with logcontext.nested_logging_context(suffix=event.event_id):
+ res = yield self._prep_event(
origin,
- ev_info["event"],
+ event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
+ defer.returnValue(res)
+
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
@@ -1664,8 +1766,8 @@ class FederationHandler(BaseHandler):
)
except AuthError as e:
logger.warn(
- "Rejecting %s because %s",
- event.event_id, e.msg
+ "[%s %s] Rejecting: %s",
+ event.room_id, event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..f284d5a385 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
- room_id, str(e.message)
+ room_id, str(e)
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9bca4e7067..c7d69d9d80 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -722,6 +722,13 @@ class SyncHandler(object):
}
if full_state:
+ if lazy_load_members:
+ # always make sure we LL ourselves so we know we're in the room
+ # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+ # We only need apply this on full state syncs given we disabled
+ # LL for incr syncs in #3840.
+ types.append((EventTypes.Member, sync_config.user.to_string()))
+
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id, types=types,
@@ -790,7 +797,7 @@ class SyncHandler(object):
else:
state_ids = {}
if lazy_load_members:
- if types:
+ if types and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2d5c23e673..b4b25cab19 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -84,10 +84,21 @@ def wrap_json_request_handler(h):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
)
- respond_with_json(
- request, code, e.error_dict(), send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- )
+
+ # Only respond with an error response if we haven't already started
+ # writing, otherwise lets just kill the connection
+ if request.startedWriting:
+ if request.transport:
+ try:
+ request.transport.abortConnection()
+ except Exception:
+ # abortConnection throws if the connection is already closed
+ pass
+ else:
+ respond_with_json(
+ request, code, e.error_dict(), send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
except Exception:
# failure.Failure() fishes the original Failure out
@@ -100,16 +111,26 @@ def wrap_json_request_handler(h):
request,
f.getTraceback().rstrip(),
)
- respond_with_json(
- request,
- 500,
- {
- "error": "Internal server error",
- "errcode": Codes.UNKNOWN,
- },
- send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- )
+ # Only respond with an error response if we haven't already started
+ # writing, otherwise lets just kill the connection
+ if request.startedWriting:
+ if request.transport:
+ try:
+ request.transport.abortConnection()
+ except Exception:
+ # abortConnection throws if the connection is already closed
+ pass
+ else:
+ respond_with_json(
+ request,
+ 500,
+ {
+ "error": "Internal server error",
+ "errcode": Codes.UNKNOWN,
+ },
+ send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
return wrap_async_request_handler(wrapped_request_handler)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 9579e8cd0d..50be2de3bb 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,9 +75,9 @@ class SynapseRequest(Request):
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
self.__class__.__name__,
id(self),
- self.method,
+ self.method.decode('ascii', errors='replace'),
self.get_redacted_uri(),
- self.clientproto,
+ self.clientproto.decode('ascii', errors='replace'),
self.site.site_tag,
)
@@ -308,7 +308,7 @@ class XForwardedForRequest(SynapseRequest):
C{b"-"}.
"""
return self.requestHeaders.getRawHeaders(
- b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+ b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
class SynapseRequestFactory(object):
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 167167be0a..173908299c 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import threading
import six
@@ -23,6 +24,9 @@ from twisted.internet import defer
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+logger = logging.getLogger(__name__)
+
+
_background_process_start_count = Counter(
"synapse_background_process_start_count",
"Number of background processes started",
@@ -191,6 +195,8 @@ def run_as_background_process(desc, func, *args, **kwargs):
try:
yield func(*args, **kwargs)
+ except Exception:
+ logger.exception("Background process '%s' threw an exception", desc)
finally:
proc.update_metrics()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f1d92c1395..340b16ce25 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
-from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -248,7 +249,10 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- run_in_background(self._notify_app_services, room_stream_id)
+ run_as_background_process(
+ "notify_app_services",
+ self._notify_app_services, room_stream_id,
+ )
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b78ce10396..1a5a10d974 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -441,7 +441,7 @@ class Mailer(object):
def make_room_link(self, room_id):
if self.hs.config.email_riot_base_url:
- base_url = self.hs.config.email_riot_base_url
+ base_url = "%s/#/room" % (self.hs.config.email_riot_base_url)
elif self.app_name == "Vector":
# need /beta for Universal Links to work on iOS
base_url = "https://vector.im/beta/#/room"
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0d8de600cf..0f339a0320 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -33,32 +33,35 @@ logger = logging.getLogger(__name__)
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
- "frozendict>=0.4": ["frozendict"],
+ "frozendict>=1": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
- "service_identity>=1.0.0": ["service_identity>=1.0.0"],
+ "service_identity>=16.0.0": ["service_identity>=16.0.0"],
"Twisted>=17.1.0": ["twisted>=17.1.0"],
"treq>=15.1": ["treq>=15.1"],
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
- "pyyaml": ["yaml"],
- "pyasn1": ["pyasn1"],
- "daemonize": ["daemonize"],
- "bcrypt": ["bcrypt>=3.1.0"],
- "pillow": ["PIL"],
- "pydenticon": ["pydenticon"],
- "sortedcontainers": ["sortedcontainers"],
- "pysaml2>=3.0.0": ["saml2>=3.0.0"],
- "pymacaroons-pynacl": ["pymacaroons"],
+ "pyyaml>=3.11": ["yaml"],
+ "pyasn1>=0.1.9": ["pyasn1"],
+ "pyasn1-modules>=0.0.7": ["pyasn1_modules"],
+ "daemonize>=2.3.1": ["daemonize"],
+ "bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
+ "pillow>=3.1.2": ["PIL"],
+ "pydenticon>=0.2": ["pydenticon"],
+ "sortedcontainers>=1.4.4": ["sortedcontainers"],
+ "pysaml2>=3.0.0": ["saml2"],
+ "pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
- "six": ["six"],
- "prometheus_client": ["prometheus_client"],
- "attrs": ["attr"],
+ "six>=1.10": ["six"],
+ "prometheus_client>=0.0.18": ["prometheus_client"],
+
+ # we use attr.s(slots), which arrived in 16.0.0
+ "attrs>=16.0.0": ["attr>=16.0.0"],
"netaddr>=0.7.18": ["netaddr"],
}
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index ca90964d1d..f911b120b1 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -52,6 +52,7 @@ class DownloadResource(Resource):
b" script-src 'none';"
b" plugin-types application/pdf;"
b" style-src 'unsafe-inline';"
+ b" media-src 'self';"
b" object-src 'self';"
)
server_name, media_id, name = parse_media_id(request)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index cad2dec33a..af01040a38 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -79,7 +79,6 @@ class PreviewUrlResource(Resource):
# don't spider URLs more often than once an hour
expiry_ms=60 * 60 * 1000,
)
- self._cache.start()
self._cleaner_loop = self.clock.looping_call(
self._start_expire_url_cache_data, 10 * 1000,
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index d7ae22a661..b22495c1f9 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -95,10 +95,6 @@ class StateHandler(object):
self.hs = hs
self._state_resolution_handler = hs.get_state_resolution_handler()
- def start_caching(self):
- # TODO: remove this shim
- self._state_resolution_handler.start_caching()
-
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key="",
latest_event_ids=None):
@@ -428,9 +424,6 @@ class StateResolutionHandler(object):
self._state_cache = None
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
- def start_caching(self):
- logger.debug("start_caching")
-
self._state_cache = ExpiringCache(
cache_name="state_cache",
clock=self.clock,
@@ -440,8 +433,6 @@ class StateResolutionHandler(object):
reset_expiry_on_get=True,
)
- self._state_cache.start()
-
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index c95477d318..7a7157b352 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
+ needed_event_count = len(needed_events)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d conflicted events", len(needed_events))
+ logger.info(
+ "Asking for %d/%d conflicted events",
+ len(needed_events),
+ needed_event_count,
+ )
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
@@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
)
new_needed_events = set(itervalues(auth_events))
+ new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d auth events", len(new_needed_events))
+ logger.info(
+ "Asking for %d/%d auth events",
+ len(new_needed_events),
+ new_needed_event_count,
+ )
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 8fc678fa67..9ad17b7c25 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
- self._simple_upsert_txn(
- txn,
- table="user_ips",
- keyvalues={
- "user_id": user_id,
- "access_token": access_token,
- "ip": ip,
- "user_agent": user_agent,
- "device_id": device_id,
- },
- values={
- "last_seen": last_seen,
- },
- lock=False,
- )
+ try:
+ self._simple_upsert_txn(
+ txn,
+ table="user_ips",
+ keyvalues={
+ "user_id": user_id,
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ "device_id": device_id,
+ },
+ values={
+ "last_seen": last_seen,
+ },
+ lock=False,
+ )
+ except Exception as e:
+ # Failed to upsert, log and continue
+ logger.error("Failed to insert client IP %r: %r", entry, e)
@defer.inlineCallbacks
def get_last_client_ip_by_device(self, user_id, device_id):
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 59580949f1..0fe8c8e24c 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
"""
+ # Am consciously deciding to lock the table on the basis that is ought
+ # never be a big table and alternative approaches (batching multiple
+ # upserts into a single txn) introduced a lot of extra complexity.
+ # See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
table="monthly_active_users",
@@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={
"timestamp": int(self._clock.time_msec()),
},
- lock=False,
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 4b971efdba..3f4cbd61c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
- def get_state_groups_ids(self, room_id, event_ids):
+ def get_state_groups_ids(self, _room_id, event_ids):
+ """Get the event IDs of all the state for the state groups for the given events
+
+ Args:
+ _room_id (str): id of the room for these events
+ event_ids (iterable[str]): ids of the events
+
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ """
if not event_ids:
defer.returnValue({})
@@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
- """Get the state IDs for the given state group
+ """Get the event IDs of all the state in the given state group
Args:
state_group (int)
@@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
- The return value is a dict mapping group names to lists of events.
+ Returns:
+ Deferred[dict[int, list[EventBase]]]:
+ dict of state_group_id -> list of state events.
"""
if not event_ids:
defer.returnValue({})
@@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
member events (if True), or to exclude member events (if False)
Returns:
- dictionary state_group -> (dict of (type, state_key) -> event id)
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
results = {}
@@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types is not None:
non_member_types = [t for t in types if t[0] != EventTypes.Member]
@@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types:
types = frozenset(types)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0c42bd3322..baf0379a68 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
@@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore):
"""
pass
- @cached(max_entries=10000)
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
@@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
- # XXX: we could chose to not bother persisting this if our cache thinks
- # this is a NOOP
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
@@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore):
retry_last_ts, retry_interval):
self.database_engine.lock_table(txn, "destinations")
- self._invalidate_cache_and_stream(
- txn, self.get_destination_retry_timings, (destination,)
- )
-
# We need to be careful here as the data may have changed from under us
# due to a worker setting the timings.
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 7b065b195e..f37d5bec08 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import os
import six
@@ -20,6 +21,8 @@ from six.moves import intern
from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
+logger = logging.getLogger(__name__)
+
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
@@ -76,16 +79,20 @@ def register_cache(cache_type, cache_name, cache):
return []
def collect(self):
- if cache_type == "response_cache":
- response_cache_size.labels(cache_name).set(len(cache))
- response_cache_hits.labels(cache_name).set(self.hits)
- response_cache_evicted.labels(cache_name).set(self.evicted_size)
- response_cache_total.labels(cache_name).set(self.hits + self.misses)
- else:
- cache_size.labels(cache_name).set(len(cache))
- cache_hits.labels(cache_name).set(self.hits)
- cache_evicted.labels(cache_name).set(self.evicted_size)
- cache_total.labels(cache_name).set(self.hits + self.misses)
+ try:
+ if cache_type == "response_cache":
+ response_cache_size.labels(cache_name).set(len(cache))
+ response_cache_hits.labels(cache_name).set(self.hits)
+ response_cache_evicted.labels(cache_name).set(self.evicted_size)
+ response_cache_total.labels(cache_name).set(self.hits + self.misses)
+ else:
+ cache_size.labels(cache_name).set(len(cache))
+ cache_hits.labels(cache_name).set(self.hits)
+ cache_evicted.labels(cache_name).set(self.evicted_size)
+ cache_total.labels(cache_name).set(self.hits + self.misses)
+ except Exception as e:
+ logger.warn("Error calculating metrics for %s: %s", cache_name, e)
+ raise
yield GaugeMetricFamily("__unused", "")
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index ce85b2ae11..9af4ec4aa8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,6 +16,8 @@
import logging
from collections import OrderedDict
+from six import itervalues
+
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
@@ -54,11 +56,8 @@ class ExpiringCache(object):
self.iterable = iterable
- self._size_estimate = 0
-
self.metrics = register_cache("expiring", cache_name, self)
- def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
return
@@ -75,16 +74,11 @@ class ExpiringCache(object):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)
- if self.iterable:
- self._size_estimate += len(value)
-
# Evict if there are now too many items
while self._max_len and len(self) > self._max_len:
_key, value = self._cache.popitem(last=False)
if self.iterable:
- removed_len = len(value.value)
- self.metrics.inc_evictions(removed_len)
- self._size_estimate -= removed_len
+ self.metrics.inc_evictions(len(value.value))
else:
self.metrics.inc_evictions()
@@ -135,7 +129,9 @@ class ExpiringCache(object):
for k in keys_to_delete:
value = self._cache.pop(k)
if self.iterable:
- self._size_estimate -= len(value.value)
+ self.metrics.inc_evictions(len(value.value))
+ else:
+ self.metrics.inc_evictions()
logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
@@ -144,7 +140,7 @@ class ExpiringCache(object):
def __len__(self):
if self.iterable:
- return self._size_estimate
+ return sum(len(entry.value) for entry in itervalues(self._cache))
else:
return len(self._cache)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d37610..89224b26cc 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
sentinel = Sentinel()
- def __init__(self, name=None, parent_context=None):
+ def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name
@@ -218,6 +218,13 @@ class LoggingContext(object):
self.parent_context = parent_context
+ if self.parent_context is not None:
+ self.parent_context.copy_to(self)
+
+ if request is not None:
+ # the request param overrides the request from the parent context
+ self.request = request
+
def __str__(self):
return "%s@%x" % (self.name, id(self))
@@ -256,9 +263,6 @@ class LoggingContext(object):
)
self.alive = True
- if self.parent_context is not None:
- self.parent_context.copy_to(self)
-
return self
def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
)
+def nested_logging_context(suffix, parent_context=None):
+ """Creates a new logging context as a child of another.
+
+ The nested logging context will have a 'request' made up of the parent context's
+ request, plus the given suffix.
+
+ CPU/db usage stats will be added to the parent context's on exit.
+
+ Normal usage looks like:
+
+ with nested_logging_context(suffix):
+ # ... do stuff
+
+ Args:
+ suffix (str): suffix to add to the parent context's 'request'.
+ parent_context (LoggingContext|None): parent context. Will use the current context
+ if None.
+
+ Returns:
+ LoggingContext: new logging context.
+ """
+ if parent_context is None:
+ parent_context = LoggingContext.current_context()
+ return LoggingContext(
+ parent_context=parent_context,
+ request=parent_context.request + "-" + suffix,
+ )
+
+
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd74..26cce7d197 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
else:
self.retry_interval = self.min_retry_interval
- logger.debug(
+ logger.info(
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
self.destination, exc_type, exc_val, self.retry_interval
)
|