diff --git a/changelog.d/3924.misc b/changelog.d/3924.misc
new file mode 100644
index 0000000000..8d010e0bd9
--- /dev/null
+++ b/changelog.d/3924.misc
@@ -0,0 +1 @@
+Comments and interface cleanup for on_receive_pdu
\ No newline at end of file
diff --git a/changelog.d/3948.misc b/changelog.d/3948.misc
new file mode 100644
index 0000000000..a93edbd1c3
--- /dev/null
+++ b/changelog.d/3948.misc
@@ -0,0 +1 @@
+Fix incompatibility with python3 on alpine
\ No newline at end of file
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
deleted file mode 100755
index d658f967ba..0000000000
--- a/synapse/app/synctl.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import argparse
-import collections
-import errno
-import glob
-import os
-import os.path
-import signal
-import subprocess
-import sys
-import time
-
-from six import iteritems
-
-import yaml
-
-SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
-
-GREEN = "\x1b[1;32m"
-YELLOW = "\x1b[1;33m"
-RED = "\x1b[1;31m"
-NORMAL = "\x1b[m"
-
-
-def pid_running(pid):
- try:
- os.kill(pid, 0)
- return True
- except OSError as err:
- if err.errno == errno.EPERM:
- return True
- return False
-
-
-def write(message, colour=NORMAL, stream=sys.stdout):
- if colour == NORMAL:
- stream.write(message + "\n")
- else:
- stream.write(colour + message + NORMAL + "\n")
-
-
-def abort(message, colour=RED, stream=sys.stderr):
- write(message, colour, stream)
- sys.exit(1)
-
-
-def start(configfile):
- write("Starting ...")
- args = SYNAPSE
- args.extend(["--daemonize", "-c", configfile])
-
- try:
- subprocess.check_call(args)
- write("started synapse.app.homeserver(%r)" %
- (configfile,), colour=GREEN)
- except subprocess.CalledProcessError as e:
- write(
- "error starting (exit code: %d); see above for logs" % e.returncode,
- colour=RED,
- )
-
-
-def start_worker(app, configfile, worker_configfile):
- args = [
- "python", "-B",
- "-m", app,
- "-c", configfile,
- "-c", worker_configfile
- ]
-
- try:
- subprocess.check_call(args)
- write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
- except subprocess.CalledProcessError as e:
- write(
- "error starting %s(%r) (exit code: %d); see above for logs" % (
- app, worker_configfile, e.returncode,
- ),
- colour=RED,
- )
-
-
-def stop(pidfile, app):
- if os.path.exists(pidfile):
- pid = int(open(pidfile).read())
- try:
- os.kill(pid, signal.SIGTERM)
- write("stopped %s" % (app,), colour=GREEN)
- except OSError as err:
- if err.errno == errno.ESRCH:
- write("%s not running" % (app,), colour=YELLOW)
- elif err.errno == errno.EPERM:
- abort("Cannot stop %s: Operation not permitted" % (app,))
- else:
- abort("Cannot stop %s: Unknown error" % (app,))
-
-
-Worker = collections.namedtuple("Worker", [
- "app", "configfile", "pidfile", "cache_factor"
-])
-
-
-def main():
-
- parser = argparse.ArgumentParser()
-
- parser.add_argument(
- "action",
- choices=["start", "stop", "restart"],
- help="whether to start, stop or restart the synapse",
- )
- parser.add_argument(
- "configfile",
- nargs="?",
- default="homeserver.yaml",
- help="the homeserver config file, defaults to homeserver.yaml",
- )
- parser.add_argument(
- "-w", "--worker",
- metavar="WORKERCONFIG",
- help="start or stop a single worker",
- )
- parser.add_argument(
- "-a", "--all-processes",
- metavar="WORKERCONFIGDIR",
- help="start or stop all the workers in the given directory"
- " and the main synapse process",
- )
-
- options = parser.parse_args()
-
- if options.worker and options.all_processes:
- write(
- 'Cannot use "--worker" with "--all-processes"',
- stream=sys.stderr
- )
- sys.exit(1)
-
- configfile = options.configfile
-
- if not os.path.exists(configfile):
- write(
- "No config file found\n"
- "To generate a config file, run '%s -c %s --generate-config"
- " --server-name=<server name>'\n" % (
- " ".join(SYNAPSE), options.configfile
- ),
- stream=sys.stderr,
- )
- sys.exit(1)
-
- with open(configfile) as stream:
- config = yaml.load(stream)
-
- pidfile = config["pid_file"]
- cache_factor = config.get("synctl_cache_factor")
- start_stop_synapse = True
-
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
-
- cache_factors = config.get("synctl_cache_factors", {})
- for cache_name, factor in iteritems(cache_factors):
- os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
-
- worker_configfiles = []
- if options.worker:
- start_stop_synapse = False
- worker_configfile = options.worker
- if not os.path.exists(worker_configfile):
- write(
- "No worker config found at %r" % (worker_configfile,),
- stream=sys.stderr,
- )
- sys.exit(1)
- worker_configfiles.append(worker_configfile)
-
- if options.all_processes:
- # To start the main synapse with -a you need to add a worker file
- # with worker_app == "synapse.app.homeserver"
- start_stop_synapse = False
- worker_configdir = options.all_processes
- if not os.path.isdir(worker_configdir):
- write(
- "No worker config directory found at %r" % (worker_configdir,),
- stream=sys.stderr,
- )
- sys.exit(1)
- worker_configfiles.extend(sorted(glob.glob(
- os.path.join(worker_configdir, "*.yaml")
- )))
-
- workers = []
- for worker_configfile in worker_configfiles:
- with open(worker_configfile) as stream:
- worker_config = yaml.load(stream)
- worker_app = worker_config["worker_app"]
- if worker_app == "synapse.app.homeserver":
- # We need to special case all of this to pick up options that may
- # be set in the main config file or in this worker config file.
- worker_pidfile = (
- worker_config.get("pid_file")
- or pidfile
- )
- worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
- daemonize = worker_config.get("daemonize") or config.get("daemonize")
- assert daemonize, "Main process must have daemonize set to true"
-
- # The master process doesn't support using worker_* config.
- for key in worker_config:
- if key == "worker_app": # But we allow worker_app
- continue
- assert not key.startswith("worker_"), \
- "Main process cannot use worker_* config"
- else:
- worker_pidfile = worker_config["worker_pid_file"]
- worker_daemonize = worker_config["worker_daemonize"]
- assert worker_daemonize, "In config %r: expected '%s' to be True" % (
- worker_configfile, "worker_daemonize")
- worker_cache_factor = worker_config.get("synctl_cache_factor")
- workers.append(Worker(
- worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
- ))
-
- action = options.action
-
- if action == "stop" or action == "restart":
- for worker in workers:
- stop(worker.pidfile, worker.app)
-
- if start_stop_synapse:
- stop(pidfile, "synapse.app.homeserver")
-
- # Wait for synapse to actually shutdown before starting it again
- if action == "restart":
- running_pids = []
- if start_stop_synapse and os.path.exists(pidfile):
- running_pids.append(int(open(pidfile).read()))
- for worker in workers:
- if os.path.exists(worker.pidfile):
- running_pids.append(int(open(worker.pidfile).read()))
- if len(running_pids) > 0:
- write("Waiting for process to exit before restarting...")
- for running_pid in running_pids:
- while pid_running(running_pid):
- time.sleep(0.2)
- write("All processes exited; now restarting...")
-
- if action == "start" or action == "restart":
- if start_stop_synapse:
- # Check if synapse is already running
- if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
- abort("synapse.app.homeserver already running")
- start(configfile)
-
- for worker in workers:
- if worker.cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
-
- start_worker(worker.app, configfile, worker.configfile)
-
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
- else:
- os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
-
-
-if __name__ == "__main__":
- main()
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index dbee404ea7..9a571e4fc7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -618,7 +618,7 @@ class FederationServer(FederationBase):
)
yield self.handler.on_receive_pdu(
- origin, pdu, get_missing=True, sent_to_us_directly=True,
+ origin, pdu, sent_to_us_directly=True,
)
def __str__(self):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8d6bd7976d..2ccdc3bfa7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -136,7 +136,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_receive_pdu(
- self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ self, origin, pdu, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -145,7 +145,8 @@ class FederationHandler(BaseHandler):
origin (str): server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu (FrozenEvent): received PDU
- get_missing (bool): True if we should fetch missing prev_events
+ sent_to_us_directly (bool): True if this event was pushed to us; False if
+ we pulled it as the result of a missing prev_event.
Returns (Deferred): completes with None
"""
@@ -250,7 +251,7 @@ class FederationHandler(BaseHandler):
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
missing_prevs = prevs - seen
- if get_missing and missing_prevs:
+ if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
@@ -282,24 +283,46 @@ class FederationHandler(BaseHandler):
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
- if sent_to_us_directly and prevs - seen:
- # If they have sent it to us directly, and the server
- # isn't telling us about the auth events that it's
- # made a message referencing, we explode
- logger.warn(
- "[%s %s] Failed to fetch %d prev events: rejecting",
- room_id, event_id, len(prevs - seen),
- )
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
- elif prevs - seen:
+ if prevs - seen:
+ # We've still not been able to get all of the prev_events for this event.
+ #
+ # In this case, we need to fall back to asking another server in the
+ # federation for the state at this event. That's ok provided we then
+ # resolve the state against other bits of the DAG before using it (which
+ # will ensure that you can't just take over a room by sending an event,
+ # withholding its prev_events, and declaring yourself to be an admin in
+ # the subsequent state request).
+ #
+ # Now, if we're pulling this event as a missing prev_event, then clearly
+ # this event is not going to become the only forward-extremity and we are
+ # guaranteed to resolve its state against our existing forward
+ # extremities, so that should be fine.
+ #
+ # On the other hand, if this event was pushed to us, it is possible for
+ # it to become the only forward-extremity in the room, and we would then
+ # trust its state to be the state for the whole room. This is very bad.
+ # Further, if the event was pushed to us, there is no excuse for us not to
+ # have all the prev_events. We therefore reject any such events.
+ #
+ # XXX this really feels like it could/should be merged with the above,
+ # but there is an interaction with min_depth that I'm not really
+ # following.
+
+ if sent_to_us_directly:
+ logger.warn(
+ "[%s %s] Failed to fetch %d prev events: rejecting",
+ room_id, event_id, len(prevs - seen),
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
+ )
+
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
@@ -464,7 +487,7 @@ class FederationHandler(BaseHandler):
yield self.on_receive_pdu(
origin,
ev,
- get_missing=False
+ sent_to_us_directly=False,
)
except FederationError as e:
if e.code == 403:
@@ -1112,7 +1135,7 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
- yield self.on_receive_pdu(origin, p)
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
diff --git a/synctl b/synctl
index 1bdceda20a..d658f967ba 120000..100755
--- a/synctl
+++ b/synctl
@@ -1 +1,284 @@
-./synapse/app/synctl.py
\ No newline at end of file
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import collections
+import errno
+import glob
+import os
+import os.path
+import signal
+import subprocess
+import sys
+import time
+
+from six import iteritems
+
+import yaml
+
+SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
+
+GREEN = "\x1b[1;32m"
+YELLOW = "\x1b[1;33m"
+RED = "\x1b[1;31m"
+NORMAL = "\x1b[m"
+
+
+def pid_running(pid):
+ try:
+ os.kill(pid, 0)
+ return True
+ except OSError as err:
+ if err.errno == errno.EPERM:
+ return True
+ return False
+
+
+def write(message, colour=NORMAL, stream=sys.stdout):
+ if colour == NORMAL:
+ stream.write(message + "\n")
+ else:
+ stream.write(colour + message + NORMAL + "\n")
+
+
+def abort(message, colour=RED, stream=sys.stderr):
+ write(message, colour, stream)
+ sys.exit(1)
+
+
+def start(configfile):
+ write("Starting ...")
+ args = SYNAPSE
+ args.extend(["--daemonize", "-c", configfile])
+
+ try:
+ subprocess.check_call(args)
+ write("started synapse.app.homeserver(%r)" %
+ (configfile,), colour=GREEN)
+ except subprocess.CalledProcessError as e:
+ write(
+ "error starting (exit code: %d); see above for logs" % e.returncode,
+ colour=RED,
+ )
+
+
+def start_worker(app, configfile, worker_configfile):
+ args = [
+ "python", "-B",
+ "-m", app,
+ "-c", configfile,
+ "-c", worker_configfile
+ ]
+
+ try:
+ subprocess.check_call(args)
+ write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
+ except subprocess.CalledProcessError as e:
+ write(
+ "error starting %s(%r) (exit code: %d); see above for logs" % (
+ app, worker_configfile, e.returncode,
+ ),
+ colour=RED,
+ )
+
+
+def stop(pidfile, app):
+ if os.path.exists(pidfile):
+ pid = int(open(pidfile).read())
+ try:
+ os.kill(pid, signal.SIGTERM)
+ write("stopped %s" % (app,), colour=GREEN)
+ except OSError as err:
+ if err.errno == errno.ESRCH:
+ write("%s not running" % (app,), colour=YELLOW)
+ elif err.errno == errno.EPERM:
+ abort("Cannot stop %s: Operation not permitted" % (app,))
+ else:
+ abort("Cannot stop %s: Unknown error" % (app,))
+
+
+Worker = collections.namedtuple("Worker", [
+ "app", "configfile", "pidfile", "cache_factor"
+])
+
+
+def main():
+
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument(
+ "action",
+ choices=["start", "stop", "restart"],
+ help="whether to start, stop or restart the synapse",
+ )
+ parser.add_argument(
+ "configfile",
+ nargs="?",
+ default="homeserver.yaml",
+ help="the homeserver config file, defaults to homeserver.yaml",
+ )
+ parser.add_argument(
+ "-w", "--worker",
+ metavar="WORKERCONFIG",
+ help="start or stop a single worker",
+ )
+ parser.add_argument(
+ "-a", "--all-processes",
+ metavar="WORKERCONFIGDIR",
+ help="start or stop all the workers in the given directory"
+ " and the main synapse process",
+ )
+
+ options = parser.parse_args()
+
+ if options.worker and options.all_processes:
+ write(
+ 'Cannot use "--worker" with "--all-processes"',
+ stream=sys.stderr
+ )
+ sys.exit(1)
+
+ configfile = options.configfile
+
+ if not os.path.exists(configfile):
+ write(
+ "No config file found\n"
+ "To generate a config file, run '%s -c %s --generate-config"
+ " --server-name=<server name>'\n" % (
+ " ".join(SYNAPSE), options.configfile
+ ),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+
+ with open(configfile) as stream:
+ config = yaml.load(stream)
+
+ pidfile = config["pid_file"]
+ cache_factor = config.get("synctl_cache_factor")
+ start_stop_synapse = True
+
+ if cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+
+ cache_factors = config.get("synctl_cache_factors", {})
+ for cache_name, factor in iteritems(cache_factors):
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
+ worker_configfiles = []
+ if options.worker:
+ start_stop_synapse = False
+ worker_configfile = options.worker
+ if not os.path.exists(worker_configfile):
+ write(
+ "No worker config found at %r" % (worker_configfile,),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+ worker_configfiles.append(worker_configfile)
+
+ if options.all_processes:
+ # To start the main synapse with -a you need to add a worker file
+ # with worker_app == "synapse.app.homeserver"
+ start_stop_synapse = False
+ worker_configdir = options.all_processes
+ if not os.path.isdir(worker_configdir):
+ write(
+ "No worker config directory found at %r" % (worker_configdir,),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+ worker_configfiles.extend(sorted(glob.glob(
+ os.path.join(worker_configdir, "*.yaml")
+ )))
+
+ workers = []
+ for worker_configfile in worker_configfiles:
+ with open(worker_configfile) as stream:
+ worker_config = yaml.load(stream)
+ worker_app = worker_config["worker_app"]
+ if worker_app == "synapse.app.homeserver":
+ # We need to special case all of this to pick up options that may
+ # be set in the main config file or in this worker config file.
+ worker_pidfile = (
+ worker_config.get("pid_file")
+ or pidfile
+ )
+ worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
+ daemonize = worker_config.get("daemonize") or config.get("daemonize")
+ assert daemonize, "Main process must have daemonize set to true"
+
+ # The master process doesn't support using worker_* config.
+ for key in worker_config:
+ if key == "worker_app": # But we allow worker_app
+ continue
+ assert not key.startswith("worker_"), \
+ "Main process cannot use worker_* config"
+ else:
+ worker_pidfile = worker_config["worker_pid_file"]
+ worker_daemonize = worker_config["worker_daemonize"]
+ assert worker_daemonize, "In config %r: expected '%s' to be True" % (
+ worker_configfile, "worker_daemonize")
+ worker_cache_factor = worker_config.get("synctl_cache_factor")
+ workers.append(Worker(
+ worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+ ))
+
+ action = options.action
+
+ if action == "stop" or action == "restart":
+ for worker in workers:
+ stop(worker.pidfile, worker.app)
+
+ if start_stop_synapse:
+ stop(pidfile, "synapse.app.homeserver")
+
+ # Wait for synapse to actually shutdown before starting it again
+ if action == "restart":
+ running_pids = []
+ if start_stop_synapse and os.path.exists(pidfile):
+ running_pids.append(int(open(pidfile).read()))
+ for worker in workers:
+ if os.path.exists(worker.pidfile):
+ running_pids.append(int(open(worker.pidfile).read()))
+ if len(running_pids) > 0:
+ write("Waiting for process to exit before restarting...")
+ for running_pid in running_pids:
+ while pid_running(running_pid):
+ time.sleep(0.2)
+ write("All processes exited; now restarting...")
+
+ if action == "start" or action == "restart":
+ if start_stop_synapse:
+ # Check if synapse is already running
+ if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
+ abort("synapse.app.homeserver already running")
+ start(configfile)
+
+ for worker in workers:
+ if worker.cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+
+ start_worker(worker.app, configfile, worker.configfile)
+
+ if cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+ else:
+ os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
+
+
+if __name__ == "__main__":
+ main()
|