diff --git a/synapse/__init__.py b/synapse/__init__.py
index 5bc24863d9..0c01546789 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,6 +17,13 @@
""" This is a reference implementation of a Matrix home server.
"""
+import sys
+
+# Check that we're not running on an unsupported Python version.
+if sys.version_info < (3, 5):
+ print("Synapse requires Python 3.5 or above.")
+ sys.exit(1)
+
try:
from twisted.internet import protocol
from twisted.internet.protocol import Factory
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 5a46b11bc0..eaf7234ee3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -540,6 +540,7 @@ def run(hs):
stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
+ stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index ae04252906..86018dfcce 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -19,15 +19,12 @@ from __future__ import print_function
# This file can't be called email.py because if it is, we cannot:
import email.utils
-import logging
import os
import pkg_resources
from ._base import Config, ConfigError
-logger = logging.getLogger(__name__)
-
class EmailConfig(Config):
def read_config(self, config):
@@ -85,10 +82,12 @@ class EmailConfig(Config):
self.email_password_reset_behaviour = (
"remote" if email_trust_identity_server_for_password_resets else "local"
)
+ self.password_resets_were_disabled_due_to_email_config = False
if self.email_password_reset_behaviour == "local" and email_config == {}:
- logger.warn(
- "User password resets have been disabled due to lack of email config"
- )
+ # We cannot warn the user this has happened here
+ # Instead do so when a user attempts to reset their password
+ self.password_resets_were_disabled_due_to_email_config = True
+
self.email_password_reset_behaviour = "off"
# Get lifetime of a validation token in milliseconds
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 564c57203d..22a2735405 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -189,11 +189,21 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
- device_message_edus, device_stream_id, dev_list_id = (
- # We have to keep 2 free slots for presence and rr_edus
- yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
+ # We have to keep 2 free slots for presence and rr_edus
+ limit = MAX_EDUS_PER_TRANSACTION - 2
+
+ device_update_edus, dev_list_id = (
+ yield self._get_device_update_edus(limit)
+ )
+
+ limit -= len(device_update_edus)
+
+ to_device_edus, device_stream_id = (
+ yield self._get_to_device_message_edus(limit)
)
+ pending_edus = device_update_edus + to_device_edus
+
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
@@ -208,10 +218,6 @@ class PerDestinationQueue(object):
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
- pending_edus = []
-
- # We can only include at most 100 EDUs per transactions
- # rr_edus and pending_presence take at most one slot each
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
@@ -232,7 +238,6 @@ class PerDestinationQueue(object):
)
)
- pending_edus.extend(device_message_edus)
pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
@@ -272,10 +277,13 @@ class PerDestinationQueue(object):
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
- if device_message_edus:
+ if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
+
+ # also mark the device updates as sent
+ if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
@@ -347,7 +355,7 @@ class PerDestinationQueue(object):
return pending_edus
@defer.inlineCallbacks
- def _get_new_device_messages(self, limit):
+ def _get_device_update_edus(self, limit):
last_device_list = self._last_device_list_stream_id
# Retrieve list of new device updates to send to the destination
@@ -366,15 +374,19 @@ class PerDestinationQueue(object):
assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
+ defer.returnValue((edus, now_stream_id))
+
+ @defer.inlineCallbacks
+ def _get_to_device_message_edus(self, limit):
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination,
last_device_stream_id,
to_device_stream_id,
- limit - len(edus),
+ limit,
)
- edus.extend(
+ edus = [
Edu(
origin=self._server_name,
destination=self._destination,
@@ -382,6 +394,6 @@ class PerDestinationQueue(object):
content=content,
)
for content in contents
- )
+ ]
- defer.returnValue((edus, stream_id, now_stream_id))
+ defer.returnValue((edus, stream_id))
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 32e004e53e..ac8f75d256 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -132,6 +132,13 @@ class DeactivateAccountHandler(BaseHandler):
# Mark the user as deactivated.
yield self.store.set_user_deactivated_status(user_id, True)
+ # Remove all information on the user from the account_validity table.
+ if self._account_validity_enabled:
+ yield self.store.delete_account_validity_for_user(user_id)
+
+ # Mark the user as deactivated.
+ yield self.store.set_user_deactivated_status(user_id, True)
+
defer.returnValue(identity_server_supports_unbinding)
@defer.inlineCallbacks
diff --git a/synapse/http/client.py b/synapse/http/client.py
index bf8afa703e..d1869f308b 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -17,7 +17,7 @@
import logging
from io import BytesIO
-from six import text_type
+from six import raise_from, text_type
from six.moves import urllib
import treq
@@ -555,10 +555,15 @@ class SimpleHttpClient(object):
length = yield make_deferred_yieldable(
_readBodyToFile(response, output_stream, max_size)
)
+ except SynapseError:
+ # This can happen e.g. because the body is too large.
+ raise
except Exception as e:
- logger.exception("Failed to download body")
- raise SynapseError(
- 502, ("Failed to download remote body: %s" % e), Codes.UNKNOWN
+ raise_from(
+ SynapseError(
+ 502, ("Failed to download remote body: %s" % e),
+ ),
+ e
)
defer.returnValue(
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef48984fdd..0d3ae1a43d 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -25,7 +25,7 @@ import six
import attr
from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
from twisted.internet import reactor
@@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class RegistryProxy(object):
-
@staticmethod
def collect():
for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ class LaterGauge(object):
try:
calls = self.caller()
except Exception:
- logger.exception(
- "Exception running callback for LaterGauge(%s)",
- self.name,
- )
+ logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
@@ -116,9 +112,7 @@ class InFlightGauge(object):
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
- "_MetricsEntry",
- attrs={x: attr.ib(0) for x in sub_metrics},
- slots=True,
+ "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)
# Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ class InFlightGauge(object):
Note: may be called by a separate thread.
"""
- in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+ in_flight = GaugeMetricFamily(
+ self.name + "_total", self.desc, labels=self.labels
+ )
metrics_by_key = {}
@@ -179,7 +175,9 @@ class InFlightGauge(object):
yield in_flight
for name in self.sub_metrics:
- gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+ gauge = GaugeMetricFamily(
+ "_".join([self.name, name]), "", labels=self.labels
+ )
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
@@ -193,12 +191,75 @@ class InFlightGauge(object):
all_gauges[self.name] = self
+@attr.s(hash=True)
+class BucketCollector(object):
+ """
+ Like a Histogram, but allows buckets to be point-in-time instead of
+ incrementally added to.
+
+ Args:
+ name (str): Base name of metric to be exported to Prometheus.
+ data_collector (callable -> dict): A synchronous callable that
+ returns a dict mapping bucket to number of items in the
+ bucket. If these buckets are not the same as the buckets
+ given to this class, they will be remapped into them.
+ buckets (list[float]): List of floats/ints of the buckets to
+ give to Prometheus. +Inf is ignored, if given.
+
+ """
+
+ name = attr.ib()
+ data_collector = attr.ib()
+ buckets = attr.ib()
+
+ def collect(self):
+
+ # Fetch the data -- this must be synchronous!
+ data = self.data_collector()
+
+ buckets = {}
+
+ res = []
+ for x in data.keys():
+ for i, bound in enumerate(self.buckets):
+ if x <= bound:
+ buckets[bound] = buckets.get(bound, 0) + data[x]
+ break
+
+ for i in self.buckets:
+ res.append([str(i), buckets.get(i, 0)])
+
+ res.append(["+Inf", sum(data.values())])
+
+ metric = HistogramMetricFamily(
+ self.name,
+ "",
+ buckets=res,
+ sum_value=sum([x * y for x, y in data.items()]),
+ )
+ yield metric
+
+ def __attrs_post_init__(self):
+ self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
+ if self.buckets != sorted(self.buckets):
+ raise ValueError("Buckets not sorted")
+
+ self.buckets = tuple(self.buckets)
+
+ if self.name in all_gauges.keys():
+ logger.warning("%s already registered, reregistering" % (self.name,))
+ REGISTRY.unregister(all_gauges.pop(self.name))
+
+ REGISTRY.register(self)
+ all_gauges[self.name] = self
+
+
#
# Detailed CPU metrics
#
-class CPUMetrics(object):
+class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
@@ -237,13 +298,28 @@ gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
- buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
- 5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+ buckets=[
+ 0.0025,
+ 0.005,
+ 0.01,
+ 0.025,
+ 0.05,
+ 0.10,
+ 0.25,
+ 0.50,
+ 1.00,
+ 2.50,
+ 5.00,
+ 7.50,
+ 15.00,
+ 30.00,
+ 45.00,
+ 60.00,
+ ],
)
class GCCounts(object):
-
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
@@ -279,9 +355,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
event_processing_loop_counter = Counter(
- "synapse_event_processing_loop_count",
- "Event processing loop iterations",
- ["name"],
+ "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
)
event_processing_loop_room_count = Counter(
@@ -311,7 +385,6 @@ last_ticked = time.time()
class ReactorLastSeenMetric(object):
-
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
@@ -325,7 +398,6 @@ REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func):
-
@functools.wraps(func)
def f(*args, **kwargs):
now = reactor.seconds()
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 7dfa78dadb..11ace2bfb1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -44,7 +44,7 @@ REQUIREMENTS = [
"canonicaljson>=1.1.3",
"signedjson>=1.0.0",
"pynacl>=1.2.1",
- "idna>=2",
+ "idna>=2.5",
# validating SSL certs for IP addresses requires service_identity 18.1.
"service_identity>=18.1.0",
@@ -65,7 +65,7 @@ REQUIREMENTS = [
"sortedcontainers>=1.4.4",
"psutil>=2.0.0",
"pymacaroons>=0.13.0",
- "msgpack>=0.5.0",
+ "msgpack>=0.5.2",
"phonenumbers>=8.2.0",
"six>=1.10",
# prometheus_client 0.4.0 changed the format of counter metrics
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 974465e90c..b88e58611c 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -69,7 +69,13 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
if self.config.email_password_reset_behaviour == "off":
- raise SynapseError(400, "Password resets have been disabled on this server")
+ if self.config.password_resets_were_disabled_due_to_email_config:
+ logger.warn(
+ "User password resets have been disabled due to lack of email config"
+ )
+ raise SynapseError(
+ 400, "Email-based password resets have been disabled on this server",
+ )
body = parse_json_object_from_request(request)
@@ -199,9 +205,6 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
- if not self.config.email_password_reset_behaviour == "off":
- raise SynapseError(400, "Password resets have been disabled on this server")
-
body = parse_json_object_from_request(request)
assert_params_in_dict(body, [
@@ -258,6 +261,14 @@ class PasswordResetSubmitTokenServlet(RestServlet):
400,
"This medium is currently not supported for password resets",
)
+ if self.config.email_password_reset_behaviour == "off":
+ if self.config.password_resets_were_disabled_due_to_email_config:
+ logger.warn(
+ "User password resets have been disabled due to lack of email config"
+ )
+ raise SynapseError(
+ 400, "Email-based password resets have been disabled on this server",
+ )
sid = parse_string(request, "sid")
client_secret = parse_string(request, "client_secret")
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 8569677355..a4929dd5db 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -386,8 +386,10 @@ class MediaRepository(object):
raise SynapseError(502, "Failed to fetch remote media")
except SynapseError:
- logger.exception("Failed to fetch remote media %s/%s",
- server_name, media_id)
+ logger.warn(
+ "Failed to fetch remote media %s/%s",
+ server_name, media_id,
+ )
raise
except NotRetryingDestination:
logger.warn("Not retrying destination %r", server_name)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71316f7d09..0ca6f6121f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,23 +279,37 @@ class DataStore(
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
+ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+ return self.runInteraction("count_daily_users", self._count_users, yesterday,)
- def _count_users(txn):
- yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
- sql = """
- SELECT COALESCE(count(*), 0) FROM (
- SELECT user_id FROM user_ips
- WHERE last_seen > ?
- GROUP BY user_id
- ) u
- """
-
- txn.execute(sql, (yesterday,))
- count, = txn.fetchone()
- return count
+ def count_monthly_users(self):
+ """
+ Counts the number of users who used this homeserver in the last 30 days.
+ Note this method is intended for phonehome metrics only and is different
+ from the mau figure in synapse.storage.monthly_active_users which,
+ amongst other things, includes a 3 day grace period before a user counts.
+ """
+ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+ return self.runInteraction(
+ "count_monthly_users",
+ self._count_users,
+ thirty_days_ago,
+ )
- return self.runInteraction("count_users", _count_users)
+ def _count_users(self, txn, time_from):
+ """
+ Returns number of users seen in the past time_from period
+ """
+ sql = """
+ SELECT COALESCE(count(*), 0) FROM (
+ SELECT user_id FROM user_ips
+ WHERE last_seen > ?
+ GROUP BY user_id
+ ) u
+ """
+ txn.execute(sql, (time_from,))
+ count, = txn.fetchone()
+ return count
def count_r30_users(self):
"""
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bc3e6de3bf..5a6f4e9e59 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,7 +17,7 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,38 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
+ # Collect metrics on the number of forward extremities that exist.
+ self._current_forward_extremities_amount = {}
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+ )
+
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
@@ -568,17 +594,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(
- r[0]
- for r in txn
- if not json.loads(r[1]).get("soft_failed")
- )
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events_which_are_prevs_txn,
- chunk,
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
defer.returnValue(results)
@@ -640,9 +660,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_prevs_before_rejected",
- _get_prevs_before_rejected_txn,
- chunk,
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
defer.returnValue(existing_prevs)
|