diff --git a/CHANGES.rst b/CHANGES.rst
index 8c180750ad..32f18e7098 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,126 @@
+Changes in synapse v0.16.0 (2016-06-09)
+=======================================
+
+NB: As of v0.14 all AS config files must have an ID field.
+
+
+Bug fixes:
+
+* Don't make rooms published by default (PR #857)
+
+Changes in synapse v0.16.0-rc2 (2016-06-08)
+===========================================
+
+Features:
+
+* Add configuration option for tuning GC via ``gc.set_threshold`` (PR #849)
+
+Changes:
+
+* Record metrics about GC (PR #771, #847, #852)
+* Add metric counter for number of persisted events (PR #841)
+
+Bug fixes:
+
+* Fix 'From' header in email notifications (PR #843)
+* Fix presence where timeouts were not being fired for the first 8h after
+ restarts (PR #842)
+* Fix bug where synapse sent malformed transactions to AS's when retrying
+ transactions (Commits 310197b, 8437906)
+
+Performance Improvements:
+
+* Remove event fetching from DB threads (PR #835)
+* Change the way we cache events (PR #836)
+* Add events to cache when we persist them (PR #840)
+
+
+Changes in synapse v0.16.0-rc1 (2016-06-03)
+===========================================
+
+Version 0.15 was not released. See v0.15.0-rc1 below for additional changes.
+
+Features:
+
+* Add email notifications for missed messages (PR #759, #786, #799, #810, #815,
+ #821)
+* Add a ``url_preview_ip_range_whitelist`` config param (PR #760)
+* Add /report endpoint (PR #762)
+* Add basic ignore user API (PR #763)
+* Add an openidish mechanism for proving that you own a given user_id (PR #765)
+* Allow clients to specify a server_name to avoid 'No known servers' (PR #794)
+* Add secondary_directory_servers option to fetch room list from other servers
+ (PR #808, #813)
+
+Changes:
+
+* Report per request metrics for all of the things using request_handler (PR
+ #756)
+* Correctly handle ``NULL`` password hashes from the database (PR #775)
+* Allow receipts for events we haven't seen in the db (PR #784)
+* Make synctl read a cache factor from config file (PR #785)
+* Increment badge count per missed convo, not per msg (PR #793)
+* Special case m.room.third_party_invite event auth to match invites (PR #814)
+
+
+Bug fixes:
+
+* Fix typo in event_auth servlet path (PR #757)
+* Fix password reset (PR #758)
+
+
+Performance improvements:
+
+* Reduce database inserts when sending transactions (PR #767)
+* Queue events by room for persistence (PR #768)
+* Add cache to ``get_user_by_id`` (PR #772)
+* Add and use ``get_domain_from_id`` (PR #773)
+* Use tree cache for ``get_linearized_receipts_for_room`` (PR #779)
+* Remove unused indices (PR #782)
+* Add caches to ``bulk_get_push_rules*`` (PR #804)
+* Cache ``get_event_reference_hashes`` (PR #806)
+* Add ``get_users_with_read_receipts_in_room`` cache (PR #809)
+* Use state to calculate ``get_users_in_room`` (PR #811)
+* Load push rules in storage layer so that they get cached (PR #825)
+* Make ``get_joined_hosts_for_room`` use get_users_in_room (PR #828)
+* Poke notifier on next reactor tick (PR #829)
+* Change CacheMetrics to be quicker (PR #830)
+
+
+Changes in synapse v0.15.0-rc1 (2016-04-26)
+===========================================
+
+Features:
+
+* Add login support for Javascript Web Tokens, thanks to Niklas Riekenbrauck
+ (PR #671,#687)
+* Add URL previewing support (PR #688)
+* Add login support for LDAP, thanks to Christoph Witzany (PR #701)
+* Add GET endpoint for pushers (PR #716)
+
+Changes:
+
+* Never notify for member events (PR #667)
+* Deduplicate identical ``/sync`` requests (PR #668)
+* Require user to have left room to forget room (PR #673)
+* Use DNS cache if within TTL (PR #677)
+* Let users see their own leave events (PR #699)
+* Deduplicate membership changes (PR #700)
+* Increase performance of pusher code (PR #705)
+* Respond with error status 504 if failed to talk to remote server (PR #731)
+* Increase search performance on postgres (PR #745)
+
+Bug fixes:
+
+* Fix bug where disabling all notifications still resulted in push (PR #678)
+* Fix bug where users couldn't reject remote invites if remote refused (PR #691)
+* Fix bug where synapse attempted to backfill from itself (PR #693)
+* Fix bug where profile information was not correctly added when joining remote
+ rooms (PR #703)
+* Fix bug where register API required incorrect key name for AS registration
+ (PR #727)
+
+
Changes in synapse v0.14.0 (2016-03-30)
=======================================
@@ -511,7 +634,7 @@ Configuration:
* Add support for changing the bind host of the metrics listener via the
``metrics_bind_host`` option.
-
+
Changes in synapse v0.9.0-r5 (2015-05-21)
=========================================
@@ -853,7 +976,7 @@ See UPGRADE for information about changes to the client server API, including
breaking backwards compatibility with VoIP calls and registration API.
Homeserver:
- * When a user changes their displayname or avatar the server will now update
+ * When a user changes their displayname or avatar the server will now update
all their join states to reflect this.
* The server now adds "age" key to events to indicate how old they are. This
is clock independent, so at no point does any server or webclient have to
@@ -911,7 +1034,7 @@ Changes in synapse 0.2.2 (2014-09-06)
=====================================
Homeserver:
- * When the server returns state events it now also includes the previous
+ * When the server returns state events it now also includes the previous
content.
* Add support for inviting people when creating a new room.
* Make the homeserver inform the room via `m.room.aliases` when a new alias
@@ -923,7 +1046,7 @@ Webclient:
* Handle `m.room.aliases` events.
* Asynchronously send messages and show a local echo.
* Inform the UI when a message failed to send.
- * Only autoscroll on receiving a new message if the user was already at the
+ * Only autoscroll on receiving a new message if the user was already at the
bottom of the screen.
* Add support for ban/kick reasons.
diff --git a/README.rst b/README.rst
index 02e7c61d1e..ebcb15a977 100644
--- a/README.rst
+++ b/README.rst
@@ -58,12 +58,13 @@ the spec in the context of a codebase and let you run your own homeserver and
generally help bootstrap the ecosystem.
In Matrix, every user runs one or more Matrix clients, which connect through to
-a Matrix homeserver which stores all their personal chat history and user
-account information - much as a mail client connects through to an IMAP/SMTP
-server. Just like email, you can either run your own Matrix homeserver and
-control and own your own communications and history or use one hosted by
-someone else (e.g. matrix.org) - there is no single point of control or
-mandatory service provider in Matrix, unlike WhatsApp, Facebook, Hangouts, etc.
+a Matrix homeserver. The homeserver stores all their personal chat history and
+user account information - much as a mail client connects through to an
+IMAP/SMTP server. Just like email, you can either run your own Matrix
+homeserver and control and own your own communications and history or use one
+hosted by someone else (e.g. matrix.org) - there is no single point of control
+or mandatory service provider in Matrix, unlike WhatsApp, Facebook, Hangouts,
+etc.
Synapse ships with two basic demo Matrix clients: webclient (a basic group chat
web client demo implemented in AngularJS) and cmdclient (a basic Python
@@ -617,7 +618,7 @@ Building internal API documentation::
-Halp!! Synapse eats all my RAM!
+Help!! Synapse eats all my RAM!
===============================
Synapse's architecture is quite RAM hungry currently - we deliberately
diff --git a/contrib/systemd/synapse.service b/contrib/systemd/synapse.service
index 2e8cd21c9e..967a4debfd 100644
--- a/contrib/systemd/synapse.service
+++ b/contrib/systemd/synapse.service
@@ -9,6 +9,7 @@ Description=Synapse Matrix homeserver
Type=simple
User=synapse
Group=synapse
+EnvironmentFile=-/etc/sysconfig/synapse
WorkingDirectory=/var/lib/synapse
ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml --log-config=/etc/synapse/log_config.yaml
diff --git a/docs/application_services.rst b/docs/application_services.rst
index 7e87ac9ad6..fbc0c7e960 100644
--- a/docs/application_services.rst
+++ b/docs/application_services.rst
@@ -32,5 +32,4 @@ The format of the AS configuration file is as follows:
See the spec_ for further details on how application services work.
-.. _spec: https://github.com/matrix-org/matrix-doc/blob/master/specification/25_application_service_api.rst#application-service-api
-
+.. _spec: https://matrix.org/docs/spec/application_service/unstable.html
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
new file mode 100755
index 0000000000..d15836e6bf
--- /dev/null
+++ b/jenkins-dendron-postgres.sh
@@ -0,0 +1,85 @@
+#!/bin/bash
+
+set -eux
+
+: ${WORKSPACE:="$(pwd)"}
+
+export PYTHONDONTWRITEBYTECODE=yep
+export SYNAPSE_CACHE_FACTOR=1
+
+# Output test results as junit xml
+export TRIAL_FLAGS="--reporter=subunit"
+export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
+# Write coverage reports to a separate file for each process
+export COVERAGE_OPTS="-p"
+export DUMP_COVERAGE_COMMAND="coverage help"
+
+# Output flake8 violations to violations.flake8.log
+# Don't exit with non-0 status code on Jenkins,
+# so that the build steps continue and a later step can decided whether to
+# UNSTABLE or FAILURE this build.
+export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
+
+rm .coverage* || echo "No coverage files to remove"
+
+tox --notest -e py27
+
+TOX_BIN=$WORKSPACE/.tox/py27/bin
+python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
+$TOX_BIN/pip install psycopg2
+$TOX_BIN/pip install lxml
+
+: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
+
+if [[ ! -e .dendron-base ]]; then
+ git clone https://github.com/matrix-org/dendron.git .dendron-base --mirror
+else
+ (cd .dendron-base; git fetch -p)
+fi
+
+rm -rf dendron
+git clone .dendron-base dendron --shared
+cd dendron
+
+: ${GOPATH:=${WORKSPACE}/.gopath}
+if [[ "${GOPATH}" != *:* ]]; then
+ mkdir -p "${GOPATH}"
+ export PATH="${GOPATH}/bin:${PATH}"
+fi
+export GOPATH
+
+git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
+
+go get github.com/constabulary/gb/...
+gb generate
+gb build
+
+cd ..
+
+
+if [[ ! -e .sytest-base ]]; then
+ git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror
+else
+ (cd .sytest-base; git fetch -p)
+fi
+
+rm -rf sytest
+git clone .sytest-base sytest --shared
+cd sytest
+
+git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
+
+: ${PORT_BASE:=8000}
+
+./jenkins/prep_sytest_for_postgres.sh
+
+mkdir -p var
+
+echo >&2 "Running sytest with PostgreSQL";
+./jenkins/install_and_run.sh --python $TOX_BIN/python \
+ --synapse-directory $WORKSPACE \
+ --dendron $WORKSPACE/dendron/bin/dendron \
+ --pusher \
+ --port-base $PORT_BASE
+
+cd ..
diff --git a/res/templates/mail.css b/res/templates/mail.css
index f2b5e84abc..5ab3e1b06d 100644
--- a/res/templates/mail.css
+++ b/res/templates/mail.css
@@ -145,6 +145,11 @@ pre, code {
text-decoration: none;
}
+.debug {
+ font-size: 10px;
+ color: #888;
+}
+
.footer {
margin-top: 20px;
text-align: center;
diff --git a/res/templates/notif.html b/res/templates/notif.html
index 834840861e..88b921ca9c 100644
--- a/res/templates/notif.html
+++ b/res/templates/notif.html
@@ -17,11 +17,15 @@
</td>
<td class="message_contents">
{% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
- <div class="sender_name">{{ message.sender_name }}</div>
+ <div class="sender_name">{% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}</div>
{% endif %}
<div class="message_body">
{% if message.msgtype == "m.text" %}
{{ message.body_text_html }}
+ {% elif message.msgtype == "m.emote" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.notice" %}
+ {{ message.body_text_html }}
{% elif message.msgtype == "m.image" %}
<img src="{{ message.image_url|mxc_to_http(640, 480, scale) }}" />
{% elif message.msgtype == "m.file" %}
diff --git a/res/templates/notif.txt b/res/templates/notif.txt
index a3ddac80ce..a37bee9833 100644
--- a/res/templates/notif.txt
+++ b/res/templates/notif.txt
@@ -1,7 +1,11 @@
{% for message in notif.messages %}
-{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
+{% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
{% if message.msgtype == "m.text" %}
{{ message.body_text_plain }}
+{% elif message.msgtype == "m.emote" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.notice" %}
+{{ message.body_text_plain }}
{% elif message.msgtype == "m.image" %}
{{ message.body_text_plain }}
{% elif message.msgtype == "m.file" %}
diff --git a/res/templates/notif_mail.html b/res/templates/notif_mail.html
index dc13398df1..8aee68b591 100644
--- a/res/templates/notif_mail.html
+++ b/res/templates/notif_mail.html
@@ -30,18 +30,20 @@
{% include 'room.html' with context %}
{% endfor %}
<div class="footer">
- <small>
- Sending email at {{ reason.now|format_ts("%c") }} due to activity in room '{{ reason.room_name }}' because:<br/>
- 1. An event was received at {{ reason.received_at|format_ts("%c") }}
- which is more than {{ "%.1f"|format(reason.delay_before_mail_ms / (60*1000)) }} (delay_before_mail_ms) mins ago.<br/>
+ <a href="{{ unsubscribe_link }}">Unsubscribe</a>
+ <br/>
+ <br/>
+ <div class="debug">
+ Sending email at {{ reason.now|format_ts("%c") }} due to activity in room {{ reason.room_name }} because
+ an event was received at {{ reason.received_at|format_ts("%c") }}
+ which is more than {{ "%.1f"|format(reason.delay_before_mail_ms / (60*1000)) }} (delay_before_mail_ms) mins ago,
{% if reason.last_sent_ts %}
- 2. The last time we sent a mail for this room was {{ reason.last_sent_ts|format_ts("%c") }},
+ and the last time we sent a mail for this room was {{ reason.last_sent_ts|format_ts("%c") }},
which is more than {{ "%.1f"|format(reason.throttle_ms / (60*1000)) }} (current throttle_ms) mins ago.
{% else %}
- 2. We can't remember the last time we sent a mail for this room.
+ and we don't have a last time we sent a mail for this room.
{% endif %}
- </small>
- <a href="{{ unsubscribe_link }}">Unsubscribe</a>
+ </div>
</div>
</td>
<td> </td>
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 7de51fbe8d..dc211e9637 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.14.0"
+__version__ = "0.16.0"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2474a1453b..31e1abb964 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""This module contains classes for authenticating the user."""
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json, SignatureVerifyException
@@ -42,13 +41,20 @@ AuthEventTypes = (
class Auth(object):
-
+ """
+ FIXME: This class contains a mix of functions for authenticating users
+ of our client-server API and authenticating events added to room graphs.
+ """
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
+ # Docs for these currently lives at
+ # https://github.com/matrix-org/matrix-doc/blob/master/drafts/macaroons_caveats.rst
+ # In addition, we have type == delete_pusher which grants access only to
+ # delete pushers.
self._KNOWN_CAVEAT_PREFIXES = set([
"gen = ",
"guest = ",
@@ -120,6 +126,24 @@ class Auth(object):
return allowed
self.check_event_sender_in_room(event, auth_events)
+
+ # Special case to allow m.room.third_party_invite events wherever
+ # a user is allowed to issue invites. Fixes
+ # https://github.com/vector-im/vector-web/issues/1208 hopefully
+ if event.type == EventTypes.ThirdPartyInvite:
+ user_level = self._get_user_power_level(event.user_id, auth_events)
+ invite_level = self._get_named_level(auth_events, "invite", 0)
+
+ if user_level < invite_level:
+ raise AuthError(
+ 403, (
+ "You cannot issue a third party invite for %s." %
+ (event.content.display_name,)
+ )
+ )
+ else:
+ return True
+
self._can_send_event(event, auth_events)
if event.type == EventTypes.PowerLevels:
@@ -507,7 +531,7 @@ class Auth(object):
return default
@defer.inlineCallbacks
- def get_user_by_req(self, request, allow_guest=False):
+ def get_user_by_req(self, request, allow_guest=False, rights="access"):
""" Get a registered user's ID.
Args:
@@ -529,7 +553,7 @@ class Auth(object):
)
access_token = request.args["access_token"][0]
- user_info = yield self.get_user_by_access_token(access_token)
+ user_info = yield self.get_user_by_access_token(access_token, rights)
user = user_info["user"]
token_id = user_info["token_id"]
is_guest = user_info["is_guest"]
@@ -590,7 +614,7 @@ class Auth(object):
defer.returnValue(user_id)
@defer.inlineCallbacks
- def get_user_by_access_token(self, token):
+ def get_user_by_access_token(self, token, rights="access"):
""" Get a registered user's ID.
Args:
@@ -601,7 +625,7 @@ class Auth(object):
AuthError if no user by that token exists or the token is invalid.
"""
try:
- ret = yield self.get_user_from_macaroon(token)
+ ret = yield self.get_user_from_macaroon(token, rights)
except AuthError:
# TODO(daniel): Remove this fallback when all existing access tokens
# have been re-issued as macaroons.
@@ -609,11 +633,11 @@ class Auth(object):
defer.returnValue(ret)
@defer.inlineCallbacks
- def get_user_from_macaroon(self, macaroon_str):
+ def get_user_from_macaroon(self, macaroon_str, rights="access"):
try:
macaroon = pymacaroons.Macaroon.deserialize(macaroon_str)
- self.validate_macaroon(macaroon, "access", self.hs.config.expire_access_token)
+ self.validate_macaroon(macaroon, rights, self.hs.config.expire_access_token)
user_prefix = "user_id = "
user = None
@@ -636,6 +660,13 @@ class Auth(object):
"is_guest": True,
"token_id": None,
}
+ elif rights == "delete_pusher":
+ # We don't store these tokens in the database
+ ret = {
+ "user": user,
+ "is_guest": False,
+ "token_id": None,
+ }
else:
# This codepath exists so that we can actually return a
# token ID, because we use token IDs in place of device
@@ -667,7 +698,8 @@ class Auth(object):
Args:
macaroon(pymacaroons.Macaroon): The macaroon to validate
- type_string(str): The kind of token this is (e.g. "access", "refresh")
+ type_string(str): The kind of token required (e.g. "access", "refresh",
+ "delete_pusher")
verify_expiry(bool): Whether to verify whether the macaroon has expired.
This should really always be True, but no clients currently implement
token refresh, so we can't enforce expiry yet.
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index df675c0ed4..40ffd9bf0d 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -16,6 +16,7 @@
import synapse
+import gc
import logging
import os
import sys
@@ -265,10 +266,9 @@ def setup(config_options):
HomeServer
"""
try:
- config = HomeServerConfig.load_config(
+ config = HomeServerConfig.load_or_generate_config(
"Synapse Homeserver",
config_options,
- generate_section="Homeserver"
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
@@ -351,6 +351,8 @@ class SynapseService(service.Service):
def startService(self):
hs = setup(self.config)
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
def stopService(self):
return self._port.stopListening()
@@ -422,6 +424,8 @@ def run(hs):
# sys.settrace(logcontext_tracer)
with LoggingContext("run"):
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
reactor.run()
if hs.config.daemonize:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 135dd58c15..4ec23d84c1 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -21,6 +21,7 @@ from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
from synapse.config.emailconfig import EmailConfig
+from synapse.config.key import KeyConfig
from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.storage.roommember import RoomMemberStore
@@ -42,6 +43,7 @@ from twisted.web.resource import Resource
from daemonize import Daemonize
+import gc
import sys
import logging
@@ -63,6 +65,40 @@ class SlaveConfig(DatabaseConfig):
self.pid_file = self.abspath(config.get("pid_file"))
self.public_baseurl = config["public_baseurl"]
+ thresholds = config.get("gc_thresholds", None)
+ if thresholds is not None:
+ try:
+ assert len(thresholds) == 3
+ self.gc_thresholds = (
+ int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+ )
+ except:
+ raise ConfigError(
+ "Value of `gc_threshold` must be a list of three integers if set"
+ )
+ else:
+ self.gc_thresholds = None
+
+ # some things used by the auth handler but not actually used in the
+ # pusher codebase
+ self.bcrypt_rounds = None
+ self.ldap_enabled = None
+ self.ldap_server = None
+ self.ldap_port = None
+ self.ldap_tls = None
+ self.ldap_search_base = None
+ self.ldap_search_property = None
+ self.ldap_email_property = None
+ self.ldap_full_name_property = None
+
+ # We would otherwise try to use the registration shared secret as the
+ # macaroon shared secret if there was no macaroon_shared_secret, but
+ # that means pulling in RegistrationConfig too. We don't need to be
+ # backwards compaitible in the pusher codebase so just make people set
+ # macaroon_shared_secret. We set this to None to prevent it referencing
+ # an undefined key.
+ self.registration_shared_secret = None
+
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("pusher.pid")
return """\
@@ -95,7 +131,7 @@ class SlaveConfig(DatabaseConfig):
""" % locals()
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
+class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
pass
@@ -290,7 +326,7 @@ class PusherServer(HomeServer):
poke_pushers(result)
except:
logger.exception("Error replicating from %r", replication_url)
- sleep(30)
+ yield sleep(30)
def setup(config_options):
@@ -321,6 +357,8 @@ def setup(config_options):
ps.start_listening()
change_resource_limit(ps.config.soft_file_limit)
+ if ps.config.gc_thresholds:
+ gc.set_threshold(*ps.config.gc_thresholds)
def start():
ps.replicate()
@@ -340,6 +378,8 @@ if __name__ == '__main__':
def run():
with LoggingContext("run"):
change_resource_limit(ps.config.soft_file_limit)
+ if ps.config.gc_thresholds:
+ gc.set_threshold(*ps.config.gc_thresholds)
reactor.run()
daemon = Daemonize(
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
new file mode 100644
index 0000000000..297e199453
--- /dev/null
+++ b/synapse/app/synchrotron.py
@@ -0,0 +1,537 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.api.constants import EventTypes, PresenceState
+from synapse.config._base import ConfigError
+from synapse.config.database import DatabaseConfig
+from synapse.config.logger import LoggingConfig
+from synapse.config.appservice import AppServiceConfig
+from synapse.events import FrozenEvent
+from synapse.handlers.presence import PresenceHandler
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.rest.client.v2_alpha import sync
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import PresenceStore, UserPresenceState
+from synapse.storage.roommember import RoomMemberStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.stringutils import random_string
+from synapse.util.versionstring import get_version_string
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import contextlib
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.synchrotron")
+
+
+class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
+ def read_config(self, config):
+ self.replication_url = config["replication_url"]
+ self.server_name = config["server_name"]
+ self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
+ "use_insecure_ssl_client_just_for_testing_do_not_use", False
+ )
+ self.user_agent_suffix = None
+ self.listeners = config["listeners"]
+ self.soft_file_limit = config.get("soft_file_limit")
+ self.daemonize = config.get("daemonize")
+ self.pid_file = self.abspath(config.get("pid_file"))
+ self.macaroon_secret_key = config["macaroon_secret_key"]
+ self.expire_access_token = config.get("expire_access_token", False)
+
+ thresholds = config.get("gc_thresholds", None)
+ if thresholds is not None:
+ try:
+ assert len(thresholds) == 3
+ self.gc_thresholds = (
+ int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+ )
+ except:
+ raise ConfigError(
+ "Value of `gc_threshold` must be a list of three integers if set"
+ )
+ else:
+ self.gc_thresholds = None
+
+ def default_config(self, server_name, **kwargs):
+ pid_file = self.abspath("synchroton.pid")
+ return """\
+ # Slave configuration
+
+ # The replication listener on the synapse to talk to.
+ #replication_url: https://localhost:{replication_port}/_synapse/replication
+
+ server_name: "%(server_name)s"
+
+ listeners:
+ # Enable a /sync listener on the synchrontron
+ #- type: http
+ # port: {http_port}
+ # bind_address: ""
+ # Enable a ssh manhole listener on the synchrotron
+ # - type: manhole
+ # port: {manhole_port}
+ # bind_address: 127.0.0.1
+ # Enable a metric listener on the synchrotron
+ # - type: http
+ # port: {metrics_port}
+ # bind_address: 127.0.0.1
+ # resources:
+ # - names: ["metrics"]
+ # compress: False
+
+ report_stats: False
+
+ daemonize: False
+
+ pid_file: %(pid_file)s
+ """ % locals()
+
+
+class SynchrotronSlavedStore(
+ SlavedPushRuleStore,
+ SlavedEventStore,
+ SlavedReceiptsStore,
+ SlavedAccountDataStore,
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ SlavedFilteringStore,
+ SlavedPresenceStore,
+ BaseSlavedStore,
+ ClientIpStore, # After BaseSlavedStore because the constructor is different
+):
+ # XXX: This is a bit broken because we don't persist forgotten rooms
+ # in a way that they can be streamed. This means that we don't have a
+ # way to invalidate the forgotten rooms cache correctly.
+ # For now we expire the cache every 10 minutes.
+ BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+ who_forgot_in_room = (
+ RoomMemberStore.__dict__["who_forgot_in_room"]
+ )
+
+ # XXX: This is a bit broken because we don't persist the accepted list in a
+ # way that can be replicated. This means that we don't have a way to
+ # invalidate the cache correctly.
+ get_presence_list_accepted = PresenceStore.__dict__[
+ "get_presence_list_accepted"
+ ]
+
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
+
+class SynchrotronPresence(object):
+ def __init__(self, hs):
+ self.http_client = hs.get_simple_http_client()
+ self.store = hs.get_datastore()
+ self.user_to_num_current_syncs = {}
+ self.syncing_users_url = hs.config.replication_url + "/syncing_users"
+ self.clock = hs.get_clock()
+
+ active_presence = self.store.take_presence_startup_info()
+ self.user_to_current_state = {
+ state.user_id: state
+ for state in active_presence
+ }
+
+ self.process_id = random_string(16)
+ logger.info("Presence process_id is %r", self.process_id)
+
+ self._sending_sync = False
+ self._need_to_send_sync = False
+ self.clock.looping_call(
+ self._send_syncing_users_regularly,
+ UPDATE_SYNCING_USERS_MS,
+ )
+
+ reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+
+ def set_state(self, user, state):
+ # TODO Hows this supposed to work?
+ pass
+
+ get_states = PresenceHandler.get_states.__func__
+ current_state_for_users = PresenceHandler.current_state_for_users.__func__
+
+ @defer.inlineCallbacks
+ def user_syncing(self, user_id, affect_presence):
+ if affect_presence:
+ curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+ self.user_to_num_current_syncs[user_id] = curr_sync + 1
+ prev_states = yield self.current_state_for_users([user_id])
+ if prev_states[user_id].state == PresenceState.OFFLINE:
+ # TODO: Don't block the sync request on this HTTP hit.
+ yield self._send_syncing_users_now()
+
+ def _end():
+ # We check that the user_id is in user_to_num_current_syncs because
+ # user_to_num_current_syncs may have been cleared if we are
+ # shutting down.
+ if affect_presence and user_id in self.user_to_num_current_syncs:
+ self.user_to_num_current_syncs[user_id] -= 1
+
+ @contextlib.contextmanager
+ def _user_syncing():
+ try:
+ yield
+ finally:
+ _end()
+
+ defer.returnValue(_user_syncing())
+
+ @defer.inlineCallbacks
+ def _on_shutdown(self):
+ # When the synchrotron is shutdown tell the master to clear the in
+ # progress syncs for this process
+ self.user_to_num_current_syncs.clear()
+ yield self._send_syncing_users_now()
+
+ def _send_syncing_users_regularly(self):
+ # Only send an update if we aren't in the middle of sending one.
+ if not self._sending_sync:
+ preserve_fn(self._send_syncing_users_now)()
+
+ @defer.inlineCallbacks
+ def _send_syncing_users_now(self):
+ if self._sending_sync:
+ # We don't want to race with sending another update.
+ # Instead we wait for that update to finish and send another
+ # update afterwards.
+ self._need_to_send_sync = True
+ return
+
+ # Flag that we are sending an update.
+ self._sending_sync = True
+
+ yield self.http_client.post_json_get_json(self.syncing_users_url, {
+ "process_id": self.process_id,
+ "syncing_users": [
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count > 0
+ ],
+ })
+
+ # Unset the flag as we are no longer sending an update.
+ self._sending_sync = False
+ if self._need_to_send_sync:
+ # If something happened while we were sending the update then
+ # we might need to send another update.
+ # TODO: Check if the update that was sent matches the current state
+ # as we only need to send an update if they are different.
+ self._need_to_send_sync = False
+ yield self._send_syncing_users_now()
+
+ def process_replication(self, result):
+ stream = result.get("presence", {"rows": []})
+ for row in stream["rows"]:
+ (
+ position, user_id, state, last_active_ts,
+ last_federation_update_ts, last_user_sync_ts, status_msg,
+ currently_active
+ ) = row
+ self.user_to_current_state[user_id] = UserPresenceState(
+ user_id, state, last_active_ts,
+ last_federation_update_ts, last_user_sync_ts, status_msg,
+ currently_active
+ )
+
+
+class SynchrotronTyping(object):
+ def __init__(self, hs):
+ self._latest_room_serial = 0
+ self._room_serials = {}
+ self._room_typing = {}
+
+ def stream_positions(self):
+ return {"typing": self._latest_room_serial}
+
+ def process_replication(self, result):
+ stream = result.get("typing")
+ if stream:
+ self._latest_room_serial = int(stream["position"])
+
+ for row in stream["rows"]:
+ position, room_id, typing_json = row
+ typing = json.loads(typing_json)
+ self._room_serials[room_id] = position
+ self._room_typing[room_id] = typing
+
+
+class SynchrotronApplicationService(object):
+ def notify_interested_services(self, event):
+ pass
+
+
+class SynchrotronServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ sync.register_servlets(self, resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse synchrotron now listening on port %d", port)
+
+ def start_listening(self):
+ for listener in self.config.listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.replication_url
+ clock = self.get_clock()
+ notifier = self.get_notifier()
+ presence_handler = self.get_presence_handler()
+ typing_handler = self.get_typing_handler()
+
+ def expire_broken_caches():
+ store.who_forgot_in_room.invalidate_all()
+ store.get_presence_list_accepted.invalidate_all()
+
+ def notify_from_stream(
+ result, stream_name, stream_key, room=None, user=None
+ ):
+ stream = result.get(stream_name)
+ if stream:
+ position_index = stream["field_names"].index("position")
+ if room:
+ room_index = stream["field_names"].index(room)
+ if user:
+ user_index = stream["field_names"].index(user)
+
+ users = ()
+ rooms = ()
+ for row in stream["rows"]:
+ position = row[position_index]
+
+ if user:
+ users = (row[user_index],)
+
+ if room:
+ rooms = (row[room_index],)
+
+ notifier.on_new_event(
+ stream_key, position, users=users, rooms=rooms
+ )
+
+ def notify(result):
+ stream = result.get("events")
+ if stream:
+ max_position = stream["position"]
+ for row in stream["rows"]:
+ position = row[0]
+ internal = json.loads(row[1])
+ event_json = json.loads(row[2])
+ event = FrozenEvent(event_json, internal_metadata_dict=internal)
+ extra_users = ()
+ if event.type == EventTypes.Member:
+ extra_users = (event.state_key,)
+ notifier.on_new_room_event(
+ event, position, max_position, extra_users
+ )
+
+ notify_from_stream(
+ result, "push_rules", "push_rules_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "user_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "room_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "tag_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "receipts", "receipt_key", room="room_id"
+ )
+ notify_from_stream(
+ result, "typing", "typing_key", room="room_id"
+ )
+
+ next_expire_broken_caches_ms = 0
+ while True:
+ try:
+ args = store.stream_positions()
+ args.update(typing_handler.stream_positions())
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ now_ms = clock.time_msec()
+ if now_ms > next_expire_broken_caches_ms:
+ expire_broken_caches()
+ next_expire_broken_caches_ms = (
+ now_ms + store.BROKEN_CACHE_EXPIRY_MS
+ )
+ yield store.process_replication(result)
+ typing_handler.process_replication(result)
+ presence_handler.process_replication(result)
+ notify(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(5)
+
+ def build_presence_handler(self):
+ return SynchrotronPresence(self)
+
+ def build_typing_handler(self):
+ return SynchrotronTyping(self)
+
+
+def setup(config_options):
+ try:
+ config = SynchrotronConfig.load_config(
+ "Synapse synchrotron", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ if not config:
+ sys.exit(0)
+
+ config.setup_logging()
+
+ database_engine = create_engine(config.database_config)
+
+ ss = SynchrotronServer(
+ config.server_name,
+ db_config=config.database_config,
+ config=config,
+ version_string=get_version_string("Synapse", synapse),
+ database_engine=database_engine,
+ application_service_handler=SynchrotronApplicationService(),
+ )
+
+ ss.setup()
+ ss.start_listening()
+
+ change_resource_limit(ss.config.soft_file_limit)
+ if ss.config.gc_thresholds:
+ ss.set_threshold(*ss.config.gc_thresholds)
+
+ def start():
+ ss.get_datastore().start_profiling()
+ ss.replicate()
+
+ reactor.callWhenRunning(start)
+
+ return ss
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ ss = setup(sys.argv[1:])
+
+ if ss.config.daemonize:
+ def run():
+ with LoggingContext("run"):
+ change_resource_limit(ss.config.soft_file_limit)
+ if ss.config.gc_thresholds:
+ gc.set_threshold(*ss.config.gc_thresholds)
+ reactor.run()
+
+ daemon = Daemonize(
+ app="synapse-synchrotron",
+ pid=ss.config.pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+
+ daemon.start()
+ else:
+ reactor.run()
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 47a4e9f864..9afc8fd754 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -56,22 +56,22 @@ import logging
logger = logging.getLogger(__name__)
-class AppServiceScheduler(object):
+class ApplicationServiceScheduler(object):
""" Public facing API for this module. Does the required DI to tie the
components together. This also serves as the "event_pool", which in this
case is a simple array.
"""
- def __init__(self, clock, store, as_api):
- self.clock = clock
- self.store = store
- self.as_api = as_api
+ def __init__(self, hs):
+ self.clock = hs.get_clock()
+ self.store = hs.get_datastore()
+ self.as_api = hs.get_application_service_api()
def create_recoverer(service, callback):
- return _Recoverer(clock, store, as_api, service, callback)
+ return _Recoverer(self.clock, self.store, self.as_api, service, callback)
self.txn_ctrl = _TransactionController(
- clock, store, as_api, create_recoverer
+ self.clock, self.store, self.as_api, create_recoverer
)
self.queuer = _ServiceQueuer(self.txn_ctrl)
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 7449f36491..af9f17bf7b 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -157,9 +157,40 @@ class Config(object):
return default_config, config
@classmethod
- def load_config(cls, description, argv, generate_section=None):
+ def load_config(cls, description, argv):
+ config_parser = argparse.ArgumentParser(
+ description=description,
+ )
+ config_parser.add_argument(
+ "-c", "--config-path",
+ action="append",
+ metavar="CONFIG_FILE",
+ help="Specify config file. Can be given multiple times and"
+ " may specify directories containing *.yaml files."
+ )
+
+ config_parser.add_argument(
+ "--keys-directory",
+ metavar="DIRECTORY",
+ help="Where files such as certs and signing keys are stored when"
+ " their location is given explicitly in the config."
+ " Defaults to the directory containing the last config file",
+ )
+
+ config_args = config_parser.parse_args(argv)
+
+ config_files = find_config_files(search_paths=config_args.config_path)
+
obj = cls()
+ obj.read_config_files(
+ config_files,
+ keys_directory=config_args.keys_directory,
+ generate_keys=False,
+ )
+ return obj
+ @classmethod
+ def load_or_generate_config(cls, description, argv):
config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument(
"-c", "--config-path",
@@ -176,7 +207,7 @@ class Config(object):
config_parser.add_argument(
"--report-stats",
action="store",
- help="Stuff",
+ help="Whether the generated config reports anonymized usage statistics",
choices=["yes", "no"]
)
config_parser.add_argument(
@@ -197,36 +228,11 @@ class Config(object):
)
config_args, remaining_args = config_parser.parse_known_args(argv)
+ config_files = find_config_files(search_paths=config_args.config_path)
+
generate_keys = config_args.generate_keys
- config_files = []
- if config_args.config_path:
- for config_path in config_args.config_path:
- if os.path.isdir(config_path):
- # We accept specifying directories as config paths, we search
- # inside that directory for all files matching *.yaml, and then
- # we apply them in *sorted* order.
- files = []
- for entry in os.listdir(config_path):
- entry_path = os.path.join(config_path, entry)
- if not os.path.isfile(entry_path):
- print (
- "Found subdirectory in config directory: %r. IGNORING."
- ) % (entry_path, )
- continue
-
- if not entry.endswith(".yaml"):
- print (
- "Found file in config directory that does not"
- " end in '.yaml': %r. IGNORING."
- ) % (entry_path, )
- continue
-
- files.append(entry_path)
-
- config_files.extend(sorted(files))
- else:
- config_files.append(config_path)
+ obj = cls()
if config_args.generate_config:
if config_args.report_stats is None:
@@ -299,28 +305,43 @@ class Config(object):
" -c CONFIG-FILE\""
)
- if config_args.keys_directory:
- config_dir_path = config_args.keys_directory
- else:
- config_dir_path = os.path.dirname(config_args.config_path[-1])
- config_dir_path = os.path.abspath(config_dir_path)
+ obj.read_config_files(
+ config_files,
+ keys_directory=config_args.keys_directory,
+ generate_keys=generate_keys,
+ )
+
+ if generate_keys:
+ return None
+
+ obj.invoke_all("read_arguments", args)
+
+ return obj
+
+ def read_config_files(self, config_files, keys_directory=None,
+ generate_keys=False):
+ if not keys_directory:
+ keys_directory = os.path.dirname(config_files[-1])
+
+ config_dir_path = os.path.abspath(keys_directory)
specified_config = {}
for config_file in config_files:
- yaml_config = cls.read_config_file(config_file)
+ yaml_config = self.read_config_file(config_file)
specified_config.update(yaml_config)
if "server_name" not in specified_config:
raise ConfigError(MISSING_SERVER_NAME)
server_name = specified_config["server_name"]
- _, config = obj.generate_config(
+ _, config = self.generate_config(
config_dir_path=config_dir_path,
server_name=server_name,
is_generating_file=False,
)
config.pop("log_config")
config.update(specified_config)
+
if "report_stats" not in config:
raise ConfigError(
MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
@@ -328,11 +349,51 @@ class Config(object):
)
if generate_keys:
- obj.invoke_all("generate_files", config)
+ self.invoke_all("generate_files", config)
return
- obj.invoke_all("read_config", config)
-
- obj.invoke_all("read_arguments", args)
-
- return obj
+ self.invoke_all("read_config", config)
+
+
+def find_config_files(search_paths):
+ """Finds config files using a list of search paths. If a path is a file
+ then that file path is added to the list. If a search path is a directory
+ then all the "*.yaml" files in that directory are added to the list in
+ sorted order.
+
+ Args:
+ search_paths(list(str)): A list of paths to search.
+
+ Returns:
+ list(str): A list of file paths.
+ """
+
+ config_files = []
+ if search_paths:
+ for config_path in search_paths:
+ if os.path.isdir(config_path):
+ # We accept specifying directories as config paths, we search
+ # inside that directory for all files matching *.yaml, and then
+ # we apply them in *sorted* order.
+ files = []
+ for entry in os.listdir(config_path):
+ entry_path = os.path.join(config_path, entry)
+ if not os.path.isfile(entry_path):
+ print (
+ "Found subdirectory in config directory: %r. IGNORING."
+ ) % (entry_path, )
+ continue
+
+ if not entry.endswith(".yaml"):
+ print (
+ "Found file in config directory that does not"
+ " end in '.yaml': %r. IGNORING."
+ ) % (entry_path, )
+ continue
+
+ files.append(entry_path)
+
+ config_files.extend(sorted(files))
+ else:
+ config_files.append(config_path)
+ return config_files
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 90bdd08f00..a187161272 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -89,7 +89,7 @@ class EmailConfig(Config):
# enable_notifs: false
# smtp_host: "localhost"
# smtp_port: 25
- # notif_from: Your Friendly Matrix Home Server <noreply@example.com>
+ # notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
# app_name: Matrix
# template_dir: res/templates
# notif_template_html: notif_mail.html
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 0b5f462e44..44b8d422e0 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import Config
+from ._base import Config, ConfigError
class ServerConfig(Config):
@@ -29,6 +29,7 @@ class ServerConfig(Config):
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
self.public_baseurl = config.get("public_baseurl")
+ self.secondary_directory_servers = config.get("secondary_directory_servers", [])
if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/':
@@ -37,6 +38,20 @@ class ServerConfig(Config):
self.listeners = config.get("listeners", [])
+ thresholds = config.get("gc_thresholds", None)
+ if thresholds is not None:
+ try:
+ assert len(thresholds) == 3
+ self.gc_thresholds = (
+ int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+ )
+ except:
+ raise ConfigError(
+ "Value of `gc_threshold` must be a list of three integers if set"
+ )
+ else:
+ self.gc_thresholds = None
+
bind_port = config.get("bind_port")
if bind_port:
self.listeners = []
@@ -156,6 +171,18 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
+ # The GC threshold parameters to pass to `gc.set_threshold`, if defined
+ # gc_thresholds: [700, 10, 10]
+
+ # A list of other Home Servers to fetch the public room directory from
+ # and include in the public room directory of this home server
+ # This is a temporary stopgap solution to populate new server with a
+ # list of rooms until there exists a good solution of a decentralized
+ # room directory.
+ # secondary_directory_servers:
+ # - matrix.org
+ # - vector.im
+
# List of ports that Synapse should listen on, their purpose and their
# configuration.
listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 37ee469fa2..d835c1b038 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -24,6 +24,7 @@ from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
@@ -551,6 +552,25 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
+ def get_public_rooms(self, destinations):
+ results_by_server = {}
+
+ @defer.inlineCallbacks
+ def _get_result(s):
+ if s == self.server_name:
+ defer.returnValue()
+
+ try:
+ result = yield self.transport_layer.get_public_rooms(s)
+ results_by_server[s] = result
+ except:
+ logger.exception("Error getting room list from server %r", s)
+
+ yield concurrently_execute(_get_result, destinations, 3)
+
+ defer.returnValue(results_by_server)
+
+ @defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):
"""
Params:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f1d231b9d8..9f2a64dede 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -377,10 +377,20 @@ class FederationServer(FederationBase):
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
+ logger.info(
+ "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+ " limit: %d, min_depth: %d",
+ earliest_events, latest_events, limit, min_depth
+ )
missing_events = yield self.handler.on_get_missing_events(
origin, room_id, earliest_events, latest_events, limit, min_depth
)
+ if len(missing_events) < 5:
+ logger.info("Returning %d events: %r", len(missing_events), missing_events)
+ else:
+ logger.info("Returning %d events", len(missing_events))
+
time_now = self._clock.time_msec()
defer.returnValue({
@@ -490,6 +500,11 @@ class FederationServer(FederationBase):
latest = set(latest)
latest |= seen
+ logger.info(
+ "Missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+
missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
@@ -517,6 +532,10 @@ class FederationServer(FederationBase):
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if prevs - seen:
+ logger.info(
+ "Still missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
fetch_state = True
if fetch_state:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index cd2841c4db..ebb698e278 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -226,6 +226,18 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
+ def get_public_rooms(self, remote_server):
+ path = PREFIX + "/publicRooms"
+
+ response = yield self.client.get_json(
+ destination=remote_server,
+ path=path,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
def exchange_third_party_invite(self, destination, room_id, event_dict):
path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 5b6c7d11dd..6fc3e2207c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -134,10 +134,12 @@ class Authenticator(object):
class BaseFederationServlet(object):
- def __init__(self, handler, authenticator, ratelimiter, server_name):
+ def __init__(self, handler, authenticator, ratelimiter, server_name,
+ room_list_handler):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
+ self.room_list_handler = room_list_handler
def _wrap(self, code):
authenticator = self.authenticator
@@ -492,6 +494,45 @@ class OpenIdUserInfo(BaseFederationServlet):
return code
+class PublicRoomList(BaseFederationServlet):
+ """
+ Fetch the public room list for this server.
+
+ This API returns information in the same format as /publicRooms on the
+ client API, but will only ever include local public rooms and hence is
+ intended for consumption by other home servers.
+
+ GET /publicRooms HTTP/1.1
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "chunk": [
+ {
+ "aliases": [
+ "#test:localhost"
+ ],
+ "guest_can_join": false,
+ "name": "test room",
+ "num_joined_members": 3,
+ "room_id": "!whkydVegtvatLfXmPN:localhost",
+ "world_readable": false
+ }
+ ],
+ "end": "END",
+ "start": "START"
+ }
+ """
+
+ PATH = "/publicRooms"
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query):
+ data = yield self.room_list_handler.get_local_public_room_list()
+ defer.returnValue((200, data))
+
+
SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -513,6 +554,7 @@ SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
OpenIdUserInfo,
+ PublicRoomList,
)
@@ -523,4 +565,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
+ room_list_handler=hs.get_room_list_handler(),
).register(resource)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 9442ae6f1d..d28e07f0d9 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,11 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.appservice.scheduler import AppServiceScheduler
-from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler
from .room import (
- RoomCreationHandler, RoomListHandler, RoomContextHandler,
+ RoomCreationHandler, RoomContextHandler,
)
from .room_member import RoomMemberHandler
from .message import MessageHandler
@@ -26,8 +24,6 @@ from .federation import FederationHandler
from .profile import ProfileHandler
from .directory import DirectoryHandler
from .admin import AdminHandler
-from .appservice import ApplicationServicesHandler
-from .auth import AuthHandler
from .identity import IdentityHandler
from .receipts import ReceiptsHandler
from .search import SearchHandler
@@ -50,19 +46,9 @@ class Handlers(object):
self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
- self.room_list_handler = RoomListHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.receipts_handler = ReceiptsHandler(hs)
- asapi = ApplicationServiceApi(hs)
- self.appservice_handler = ApplicationServicesHandler(
- hs, asapi, AppServiceScheduler(
- clock=hs.get_clock(),
- store=hs.get_datastore(),
- as_api=asapi
- )
- )
- self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..051ccdb380 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
-from synapse.types import UserID
import logging
@@ -35,16 +34,13 @@ def log_failure(failure):
)
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
class ApplicationServicesHandler(object):
- def __init__(self, hs, appservice_api, appservice_scheduler):
+ def __init__(self, hs):
self.store = hs.get_datastore()
- self.hs = hs
- self.appservice_api = appservice_api
- self.scheduler = appservice_scheduler
+ self.is_mine_id = hs.is_mine_id
+ self.appservice_api = hs.get_application_service_api()
+ self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
@defer.inlineCallbacks
@@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
@defer.inlineCallbacks
def _is_unknown_user(self, user_id):
- user = UserID.from_string(user_id)
- if not self.hs.is_mine(user):
+ if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
defer.returnValue(False)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 68d0d78fc6..200793b5ed 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.types import UserID
-from synapse.api.errors import AuthError, LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
from synapse.util.async import run_on_reactor
from twisted.web.client import PartialDownloadError
@@ -529,6 +529,11 @@ class AuthHandler(BaseHandler):
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
+ def generate_delete_pusher_token(self, user_id):
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = delete_pusher")
+ return macaroon.serialize()
+
def validate_short_term_login_token_and_get_user_id(self, login_token):
try:
macaroon = pymacaroons.Macaroon.deserialize(login_token)
@@ -563,7 +568,12 @@ class AuthHandler(BaseHandler):
except_access_token_ids = [requester.access_token_id] if requester else []
- yield self.store.user_set_password_hash(user_id, password_hash)
+ try:
+ yield self.store.user_set_password_hash(user_id, password_hash)
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+ raise e
yield self.store.user_delete_access_tokens(
user_id, except_access_token_ids
)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8eeb225811..4bea7f2b19 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
super(DirectoryHandler, self).__init__(hs)
self.state = hs.get_state_handler()
+ self.appservice_handler = hs.get_application_service_handler()
self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
@@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
)
if not result:
# Query AS to see if it exists
- as_handler = self.hs.get_handlers().appservice_handler
+ as_handler = self.appservice_handler
result = yield as_handler.query_room_alias_exists(room_alias)
defer.returnValue(result)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 648a505e65..ff83c608e7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -66,10 +66,6 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.distributor.observe("user_joined_room", self.user_joined_room)
-
- self.waiting_for_join_list = {}
-
self.store = hs.get_datastore()
self.replication_layer = hs.get_replication_layer()
self.state_handler = hs.get_state_handler()
@@ -1091,15 +1087,6 @@ class FederationHandler(BaseHandler):
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
- @log_function
- def user_joined_room(self, user, room_id):
- waiters = self.waiting_for_join_list.get(
- (user.to_string(), room_id),
- []
- )
- while waiters:
- waiters.pop().callback(None)
-
@defer.inlineCallbacks
@log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c41dafdef5..15caf1950a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -26,9 +26,9 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
)
from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async import concurrently_execute, run_on_reactor
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import preserve_fn
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -908,13 +908,16 @@ class MessageHandler(BaseHandler):
"Failed to get destination from event %s", s.event_id
)
- with PreserveLoggingContext():
- # Don't block waiting on waking up all the listeners.
+ @defer.inlineCallbacks
+ def _notify():
+ yield run_on_reactor()
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
+ preserve_fn(_notify)()
+
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37f57301fb..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -50,6 +50,8 @@ timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
+get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
@@ -68,6 +70,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
# How often to resend presence to remote servers
FEDERATION_PING_INTERVAL = 25 * 60 * 1000
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
@@ -158,15 +164,26 @@ class PresenceHandler(object):
self.serial_to_user = {}
self._next_serial = 1
- # Keeps track of the number of *ongoing* syncs. While this is non zero
- # a user will never go offline.
+ # Keeps track of the number of *ongoing* syncs on this process. While
+ # this is non zero a user will never go offline.
self.user_to_num_current_syncs = {}
+ # Keeps track of the number of *ongoing* syncs on other processes.
+ # While any sync is ongoing on another process the user will never
+ # go offline.
+ # Each process has a unique identifier and an update frequency. If
+ # no update is received from that process within the update period then
+ # we assume that all the sync requests on that process have stopped.
+ # Stored as a dict from process_id to set of user_id, and a dict of
+ # process_id to millisecond timestamp last updated.
+ self.external_process_to_current_syncs = {}
+ self.external_process_last_updated_ms = {}
+
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
- 30 * 1000,
+ 30,
self.clock.looping_call,
self._handle_timeouts,
5000,
@@ -266,31 +283,48 @@ class PresenceHandler(object):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
+ logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = self.wheel_timer.fetch(now)
+ try:
+ with Measure(self.clock, "presence_handle_timeouts"):
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in set(users_to_check)
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc_by(len(states))
+ timers_fired_counter.inc_by(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.is_mine_id,
- user_to_num_current_syncs=self.user_to_num_current_syncs,
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- preserve_fn(self._update_states)(changes)
+ preserve_fn(self._update_states)(changes)
+ except:
+ logger.exception("Exception in _handle_timeouts loop")
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
@@ -363,6 +397,74 @@ class PresenceHandler(object):
defer.returnValue(_user_syncing())
+ def get_currently_syncing_users(self):
+ """Get the set of user ids that are currently syncing on this HS.
+ Returns:
+ set(str): A set of user_id strings.
+ """
+ syncing_user_ids = {
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count
+ }
+ for user_ids in self.external_process_to_current_syncs.values():
+ syncing_user_ids.update(user_ids)
+ return syncing_user_ids
+
+ @defer.inlineCallbacks
+ def update_external_syncs(self, process_id, syncing_user_ids):
+ """Update the syncing users for an external process
+
+ Args:
+ process_id(str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ syncing_user_ids(set(str)): The set of user_ids that are
+ currently syncing on that server.
+ """
+
+ # Grab the previous list of user_ids that were syncing on that process
+ prev_syncing_user_ids = (
+ self.external_process_to_current_syncs.get(process_id, set())
+ )
+ # Grab the current presence state for both the users that are syncing
+ # now and the users that were syncing before this update.
+ prev_states = yield self.current_state_for_users(
+ syncing_user_ids | prev_syncing_user_ids
+ )
+ updates = []
+ time_now_ms = self.clock.time_msec()
+
+ # For each new user that is syncing check if we need to mark them as
+ # being online.
+ for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+ prev_state = prev_states[new_user_id]
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=time_now_ms,
+ last_user_sync_ts=time_now_ms,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ # For each user that is still syncing or stopped syncing update the
+ # last sync time so that we will correctly apply the grace period when
+ # they stop syncing.
+ for old_user_id in prev_syncing_user_ids:
+ prev_state = prev_states[old_user_id]
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ yield self._update_states(updates)
+
+ # Update the last updated time for the process. We expire the entries
+ # if we don't receive an update in the given timeframe.
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+ self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
@defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
@@ -879,13 +981,13 @@ class PresenceEventSource(object):
user_ids_changed = set()
changed = None
- if from_key and max_token - from_key < 100:
- # For small deltas, its quicker to get all changes and then
- # work out if we share a room or they're in our presence list
+ if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
- # get_all_entities_changed can return None
- if changed is not None:
+ if changed is not None and len(changed) < 500:
+ # For small deltas, its quicker to get all changes and then
+ # work out if we share a room or they're in our presence list
+ get_updates_counter.inc("stream")
for other_user_id in changed:
if other_user_id in friends:
user_ids_changed.add(other_user_id)
@@ -897,6 +999,8 @@ class PresenceEventSource(object):
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
+ get_updates_counter.inc("full")
+
user_ids_to_check = set()
for room_id in room_ids:
users = yield self.store.get_users_in_room(room_id)
@@ -935,15 +1039,14 @@ class PresenceEventSource(object):
return self.get_new_events(user, from_key=None, include_offline=False)
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
appropriate.
Args:
user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -954,21 +1057,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
for state in user_states:
is_mine = is_mine_fn(state.user_id)
- new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+ new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
if new_state:
changes[state.user_id] = new_state
return changes.values()
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state (UserPresenceState)
is_mine (bool): Whether the user is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -1002,7 +1104,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
- if not user_to_num_current_syncs.get(user_id, 0):
+ if user_id not in syncing_user_ids:
if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 5883b9111e..e0aaefe7be 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -16,7 +16,7 @@
"""Contains functions for registering clients."""
from twisted.internet import defer
-from synapse.types import UserID
+from synapse.types import UserID, Requester
from synapse.api.errors import (
AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
)
@@ -360,7 +360,8 @@ class RegistrationHandler(BaseHandler):
@defer.inlineCallbacks
def get_or_create_user(self, localpart, displayname, duration_seconds):
- """Creates a new user or returns an access token for an existing one
+ """Creates a new user if the user does not exist,
+ else revokes all previous access tokens and generates a new one.
Args:
localpart : The local part of the user ID to register. If None,
@@ -387,8 +388,8 @@ class RegistrationHandler(BaseHandler):
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
- auth_handler = self.hs.get_handlers().auth_handler
- token = auth_handler.generate_short_term_login_token(user_id, duration_seconds)
+ token = self.auth_handler().generate_short_term_login_token(
+ user_id, duration_seconds)
if need_register:
yield self.store.register(
@@ -399,20 +400,20 @@ class RegistrationHandler(BaseHandler):
yield registered_user(self.distributor, user)
else:
- yield self.store.flush_user(user_id=user_id)
+ yield self.store.user_delete_access_tokens(user_id=user_id)
yield self.store.add_access_token_to_user(user_id=user_id, token=token)
if displayname is not None:
logger.info("setting user display name: %s -> %s", user_id, displayname)
profile_handler = self.hs.get_handlers().profile_handler
yield profile_handler.set_displayname(
- user, user, displayname
+ user, Requester(user, token, False), displayname
)
defer.returnValue((user_id, token))
def auth_handler(self):
- return self.hs.get_handlers().auth_handler
+ return self.hs.get_auth_handler()
@defer.inlineCallbacks
def guest_access_token_for(self, medium, address, inviter_user_id):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3d63b3c513..ae44c7a556 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -20,7 +20,7 @@ from ._base import BaseHandler
from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
- EventTypes, JoinRules, RoomCreationPreset,
+ EventTypes, JoinRules, RoomCreationPreset, Membership,
)
from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.util import stringutils
@@ -36,6 +36,8 @@ import string
logger = logging.getLogger(__name__)
+REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
+
id_server_scheme = "https://"
@@ -344,8 +346,14 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache()
+ self.remote_list_request_cache = ResponseCache()
+ self.remote_list_cache = {}
+ self.fetch_looping_call = hs.get_clock().looping_call(
+ self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
+ )
+ self.fetch_all_remote_lists()
- def get_public_room_list(self):
+ def get_local_public_room_list(self):
result = self.response_cache.get(())
if not result:
result = self.response_cache.set((), self._get_public_room_list())
@@ -359,14 +367,10 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def handle_room(room_id):
- # We pull each bit of state out indvidually to avoid pulling the
- # full state into memory. Due to how the caching works this should
- # be fairly quick, even if not originally in the cache.
- def get_state(etype, state_key):
- return self.state_handler.get_current_state(room_id, etype, state_key)
+ current_state = yield self.state_handler.get_current_state(room_id)
# Double check that this is actually a public room.
- join_rules_event = yield get_state(EventTypes.JoinRules, "")
+ join_rules_event = current_state.get((EventTypes.JoinRules, ""))
if join_rules_event:
join_rule = join_rules_event.content.get("join_rule", None)
if join_rule and join_rule != JoinRules.PUBLIC:
@@ -374,47 +378,51 @@ class RoomListHandler(BaseHandler):
result = {"room_id": room_id}
- joined_users = yield self.store.get_users_in_room(room_id)
- if len(joined_users) == 0:
+ num_joined_users = len([
+ 1 for _, event in current_state.items()
+ if event.type == EventTypes.Member
+ and event.membership == Membership.JOIN
+ ])
+ if num_joined_users == 0:
return
- result["num_joined_members"] = len(joined_users)
+ result["num_joined_members"] = num_joined_users
aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
result["aliases"] = aliases
- name_event = yield get_state(EventTypes.Name, "")
+ name_event = yield current_state.get((EventTypes.Name, ""))
if name_event:
name = name_event.content.get("name", None)
if name:
result["name"] = name
- topic_event = yield get_state(EventTypes.Topic, "")
+ topic_event = current_state.get((EventTypes.Topic, ""))
if topic_event:
topic = topic_event.content.get("topic", None)
if topic:
result["topic"] = topic
- canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
+ canonical_event = current_state.get((EventTypes.CanonicalAlias, ""))
if canonical_event:
canonical_alias = canonical_event.content.get("alias", None)
if canonical_alias:
result["canonical_alias"] = canonical_alias
- visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
+ visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, ""))
visibility = None
if visibility_event:
visibility = visibility_event.content.get("history_visibility", None)
result["world_readable"] = visibility == "world_readable"
- guest_event = yield get_state(EventTypes.GuestAccess, "")
+ guest_event = current_state.get((EventTypes.GuestAccess, ""))
guest = None
if guest_event:
guest = guest_event.content.get("guest_access", None)
result["guest_can_join"] = guest == "can_join"
- avatar_event = yield get_state("m.room.avatar", "")
+ avatar_event = current_state.get(("m.room.avatar", ""))
if avatar_event:
avatar_url = avatar_event.content.get("url", None)
if avatar_url:
@@ -427,6 +435,55 @@ class RoomListHandler(BaseHandler):
# FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": results})
+ @defer.inlineCallbacks
+ def fetch_all_remote_lists(self):
+ deferred = self.hs.get_replication_layer().get_public_rooms(
+ self.hs.config.secondary_directory_servers
+ )
+ self.remote_list_request_cache.set((), deferred)
+ self.remote_list_cache = yield deferred
+
+ @defer.inlineCallbacks
+ def get_aggregated_public_room_list(self):
+ """
+ Get the public room list from this server and the servers
+ specified in the secondary_directory_servers config option.
+ XXX: Pagination...
+ """
+ # We return the results from out cache which is updated by a looping call,
+ # unless we're missing a cache entry, in which case wait for the result
+ # of the fetch if there's one in progress. If not, omit that server.
+ wait = False
+ for s in self.hs.config.secondary_directory_servers:
+ if s not in self.remote_list_cache:
+ logger.warn("No cached room list from %s: waiting for fetch", s)
+ wait = True
+ break
+
+ if wait and self.remote_list_request_cache.get(()):
+ yield self.remote_list_request_cache.get(())
+
+ public_rooms = yield self.get_local_public_room_list()
+
+ # keep track of which room IDs we've seen so we can de-dup
+ room_ids = set()
+
+ # tag all the ones in our list with our server name.
+ # Also add the them to the de-deping set
+ for room in public_rooms['chunk']:
+ room["server_name"] = self.hs.hostname
+ room_ids.add(room["room_id"])
+
+ # Now add the results from federation
+ for server_name, server_result in self.remote_list_cache.items():
+ for room in server_result["chunk"]:
+ if room["room_id"] not in room_ids:
+ room["server_name"] = server_name
+ public_rooms["chunk"].append(room)
+ room_ids.add(room["room_id"])
+
+ defer.returnValue(public_rooms)
+
class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b47fefc13f..451182cfec 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -210,9 +210,8 @@ class SyncHandler(object):
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
- rawrules = yield self.store.get_push_rules_for_user(user_id)
- enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
- rules = format_push_rules_for_user(user, rawrules, enabled_map)
+ rules = yield self.store.get_push_rules_for_user(user_id)
+ rules = format_push_rules_for_user(user, rules)
defer.returnValue(rules)
@defer.inlineCallbacks
@@ -654,6 +653,9 @@ class SyncHandler(object):
)
presence.extend(states)
+ # Deduplicate the presence entries so that there's at most one per user
+ presence = {p["content"]["user_id"]: p for p in presence}.values()
+
presence = sync_config.filter_collection.filter_presence(
presence
)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d46f05f426..5589296c09 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
# A tiny object useful for storing a user's membership in a room, as a mapping
# key
-RoomMember = namedtuple("RoomMember", ("room_id", "user"))
+RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
class TypingHandler(object):
@@ -38,7 +38,7 @@ class TypingHandler(object):
self.store = hs.get_datastore()
self.server_name = hs.config.server_name
self.auth = hs.get_auth()
- self.is_mine = hs.is_mine
+ self.is_mine_id = hs.is_mine_id
self.notifier = hs.get_notifier()
self.clock = hs.get_clock()
@@ -67,20 +67,23 @@ class TypingHandler(object):
@defer.inlineCallbacks
def started_typing(self, target_user, auth_user, room_id, timeout):
- if not self.is_mine(target_user):
+ target_user_id = target_user.to_string()
+ auth_user_id = auth_user.to_string()
+
+ if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != auth_user:
+ if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_joined_room(room_id, target_user.to_string())
+ yield self.auth.check_joined_room(room_id, target_user_id)
logger.debug(
- "%s has started typing in %s", target_user.to_string(), room_id
+ "%s has started typing in %s", target_user_id, room_id
)
until = self.clock.time_msec() + timeout
- member = RoomMember(room_id=room_id, user=target_user)
+ member = RoomMember(room_id=room_id, user_id=target_user_id)
was_present = member in self._member_typing_until
@@ -104,25 +107,28 @@ class TypingHandler(object):
yield self._push_update(
room_id=room_id,
- user=target_user,
+ user_id=target_user_id,
typing=True,
)
@defer.inlineCallbacks
def stopped_typing(self, target_user, auth_user, room_id):
- if not self.is_mine(target_user):
+ target_user_id = target_user.to_string()
+ auth_user_id = auth_user.to_string()
+
+ if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != auth_user:
+ if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_joined_room(room_id, target_user.to_string())
+ yield self.auth.check_joined_room(room_id, target_user_id)
logger.debug(
- "%s has stopped typing in %s", target_user.to_string(), room_id
+ "%s has stopped typing in %s", target_user_id, room_id
)
- member = RoomMember(room_id=room_id, user=target_user)
+ member = RoomMember(room_id=room_id, user_id=target_user_id)
if member in self._member_typing_timer:
self.clock.cancel_call_later(self._member_typing_timer[member])
@@ -132,8 +138,9 @@ class TypingHandler(object):
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
- if self.is_mine(user):
- member = RoomMember(room_id=room_id, user=user)
+ user_id = user.to_string()
+ if self.is_mine_id(user_id):
+ member = RoomMember(room_id=room_id, user_id=user_id)
yield self._stopped_typing(member)
@defer.inlineCallbacks
@@ -144,7 +151,7 @@ class TypingHandler(object):
yield self._push_update(
room_id=member.room_id,
- user=member.user,
+ user_id=member.user_id,
typing=False,
)
@@ -156,7 +163,7 @@ class TypingHandler(object):
del self._member_typing_timer[member]
@defer.inlineCallbacks
- def _push_update(self, room_id, user, typing):
+ def _push_update(self, room_id, user_id, typing):
domains = yield self.store.get_joined_hosts_for_room(room_id)
deferreds = []
@@ -164,7 +171,7 @@ class TypingHandler(object):
if domain == self.server_name:
self._push_update_local(
room_id=room_id,
- user=user,
+ user_id=user_id,
typing=typing
)
else:
@@ -173,7 +180,7 @@ class TypingHandler(object):
edu_type="m.typing",
content={
"room_id": room_id,
- "user_id": user.to_string(),
+ "user_id": user_id,
"typing": typing,
},
))
@@ -183,23 +190,26 @@ class TypingHandler(object):
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
room_id = content["room_id"]
- user = UserID.from_string(content["user_id"])
+ user_id = content["user_id"]
+
+ # Check that the string is a valid user id
+ UserID.from_string(user_id)
domains = yield self.store.get_joined_hosts_for_room(room_id)
if self.server_name in domains:
self._push_update_local(
room_id=room_id,
- user=user,
+ user_id=user_id,
typing=content["typing"]
)
- def _push_update_local(self, room_id, user, typing):
+ def _push_update_local(self, room_id, user_id, typing):
room_set = self._room_typing.setdefault(room_id, set())
if typing:
- room_set.add(user)
+ room_set.add(user_id)
else:
- room_set.discard(user)
+ room_set.discard(user_id)
self._latest_room_serial += 1
self._room_serials[room_id] = self._latest_room_serial
@@ -211,13 +221,14 @@ class TypingHandler(object):
def get_all_typing_updates(self, last_id, current_id):
# TODO: Work out a way to do this without scanning the entire state.
+ if last_id == current_id:
+ return []
+
rows = []
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
typing = self._room_typing[room_id]
- typing_bytes = json.dumps([
- u.to_string() for u in typing
- ], ensure_ascii=False)
+ typing_bytes = json.dumps(list(typing), ensure_ascii=False)
rows.append((serial, room_id, typing_bytes))
rows.sort()
return rows
@@ -239,7 +250,7 @@ class TypingNotificationEventSource(object):
"type": "m.typing",
"room_id": room_id,
"content": {
- "user_ids": [u.to_string() for u in typing],
+ "user_ids": list(typing),
},
}
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 5664d5a381..bdd7292a30 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -22,6 +22,7 @@ import functools
import os
import stat
import time
+import gc
from twisted.internet import reactor
@@ -33,11 +34,7 @@ from .metric import (
logger = logging.getLogger(__name__)
-# We'll keep all the available metrics in a single toplevel dict, one shared
-# for the entire process. We don't currently support per-HomeServer instances
-# of metrics, because in practice any one python VM will host only one
-# HomeServer anyway. This makes a lot of implementation neater
-all_metrics = {}
+all_metrics = []
class Metrics(object):
@@ -53,7 +50,7 @@ class Metrics(object):
metric = metric_class(full_name, *args, **kwargs)
- all_metrics[full_name] = metric
+ all_metrics.append(metric)
return metric
def register_counter(self, *args, **kwargs):
@@ -84,12 +81,12 @@ def render_all():
# TODO(paul): Internal hack
update_resource_metrics()
- for name in sorted(all_metrics.keys()):
+ for metric in all_metrics:
try:
- strs += all_metrics[name].render()
+ strs += metric.render()
except Exception:
- strs += ["# FAILED to render %s" % name]
- logger.exception("Failed to render %s metric", name)
+ strs += ["# FAILED to render"]
+ logger.exception("Failed to render metric")
strs.append("") # to generate a final CRLF
@@ -156,6 +153,13 @@ reactor_metrics = get_metrics_for("reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"])
+gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"])
+
+reactor_metrics.register_callback(
+ "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
+)
+
def runUntilCurrentTimer(func):
@@ -182,6 +186,22 @@ def runUntilCurrentTimer(func):
end = time.time() * 1000
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
+
+ # Check if we need to do a manual GC (since its been disabled), and do
+ # one if necessary.
+ threshold = gc.get_threshold()
+ counts = gc.get_count()
+ for i in (2, 1, 0):
+ if threshold[i] < counts[i]:
+ logger.info("Collecting gc %d", i)
+
+ start = time.time() * 1000
+ unreachable = gc.collect(i)
+ end = time.time() * 1000
+
+ gc_time.inc_by(end - start, i)
+ gc_unreachable.inc_by(unreachable, i)
+
return ret
return f
@@ -196,5 +216,9 @@ try:
# runUntilCurrent is called when we have pending calls. It is called once
# per iteratation after fd polling.
reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
+
+ # We manually run the GC each reactor tick so that we can get some metrics
+ # about time spent doing GC,
+ gc.disable()
except AttributeError:
pass
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index 368fc24984..341043952a 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -47,9 +47,6 @@ class BaseMetric(object):
for k, v in zip(self.labels, values)])
)
- def render(self):
- return map_concat(self.render_item, sorted(self.counts.keys()))
-
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
@@ -83,6 +80,9 @@ class CounterMetric(BaseMetric):
def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
+ def render(self):
+ return map_concat(self.render_item, sorted(self.counts.keys()))
+
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
@@ -126,30 +126,30 @@ class DistributionMetric(object):
class CacheMetric(object):
- """A combination of two CounterMetrics, one to count cache hits and one to
- count a total, and a callback metric to yield the current size.
+ __slots__ = ("name", "cache_name", "hits", "misses", "size_callback")
- This metric generates standard metric name pairs, so that monitoring rules
- can easily be applied to measure hit ratio."""
-
- def __init__(self, name, size_callback, labels=[]):
+ def __init__(self, name, size_callback, cache_name):
self.name = name
+ self.cache_name = cache_name
- self.hits = CounterMetric(name + ":hits", labels=labels)
- self.total = CounterMetric(name + ":total", labels=labels)
+ self.hits = 0
+ self.misses = 0
- self.size = CallbackMetric(
- name + ":size",
- callback=size_callback,
- labels=labels,
- )
+ self.size_callback = size_callback
- def inc_hits(self, *values):
- self.hits.inc(*values)
- self.total.inc(*values)
+ def inc_hits(self):
+ self.hits += 1
- def inc_misses(self, *values):
- self.total.inc(*values)
+ def inc_misses(self):
+ self.misses += 1
def render(self):
- return self.hits.render() + self.total.render() + self.size.render()
+ size = self.size_callback()
+ hits = self.hits
+ total = self.misses + self.hits
+
+ return [
+ """%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
+ """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
+ """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
+ ]
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 33b79c0ec7..30883a0696 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,7 +14,7 @@
# limitations under the License.
from twisted.internet import defer
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
@@ -140,8 +140,6 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs):
- self.hs = hs
-
self.user_to_user_stream = {}
self.room_to_user_streams = {}
self.appservice_to_user_streams = {}
@@ -151,10 +149,8 @@ class Notifier(object):
self.pending_new_room_events = []
self.clock = hs.get_clock()
-
- hs.get_distributor().observe(
- "user_joined_room", self._user_joined_room
- )
+ self.appservice_handler = hs.get_application_service_handler()
+ self.state_handler = hs.get_state_handler()
self.clock.looping_call(
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
@@ -232,9 +228,7 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- self.hs.get_handlers().appservice_handler.notify_interested_services(
- event
- )
+ self.appservice_handler.notify_interested_services(event)
app_streams = set()
@@ -250,6 +244,9 @@ class Notifier(object):
)
app_streams |= app_user_streams
+ if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+ self._user_joined_room(event.state_key, event.room_id)
+
self.on_new_event(
"room_key", room_stream_id,
users=extra_users,
@@ -449,7 +446,7 @@ class Notifier(object):
@defer.inlineCallbacks
def _is_world_readable(self, room_id):
- state = yield self.hs.get_state_handler().get_current_state(
+ state = yield self.state_handler.get_current_state(
room_id,
EventTypes.RoomHistoryVisibility
)
@@ -485,9 +482,8 @@ class Notifier(object):
user_stream.appservice, set()
).add(user_stream)
- def _user_joined_room(self, user, room_id):
- user = str(user)
- new_user_stream = self.user_to_user_stream.get(user)
+ def _user_joined_room(self, user_id, room_id):
+ new_user_stream = self.user_to_user_stream.get(user_id)
if new_user_stream is not None:
room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream)
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 9b208668b6..46e768e35c 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,7 +40,7 @@ class ActionGenerator:
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "handle_push_actions_for_event"):
bulk_evaluator = yield evaluator_for_event(
- event, self.hs, self.store
+ event, self.hs, self.store, context.current_state
)
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 25e13b3423..756e5da513 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -14,84 +14,56 @@
# limitations under the License.
import logging
-import ujson as json
from twisted.internet import defer
-from .baserules import list_with_base_rules
from .push_rule_evaluator import PushRuleEvaluatorForEvent
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.visibility import filter_events_for_clients
logger = logging.getLogger(__name__)
-def decode_rule_json(rule):
- rule['conditions'] = json.loads(rule['conditions'])
- rule['actions'] = json.loads(rule['actions'])
- return rule
-
-
@defer.inlineCallbacks
def _get_rules(room_id, user_ids, store):
rules_by_user = yield store.bulk_get_push_rules(user_ids)
- rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
-
- rules_by_user = {
- uid: list_with_base_rules([
- decode_rule_json(rule_list)
- for rule_list in rules_by_user.get(uid, [])
- ])
- for uid in user_ids
- }
-
- # We apply the rules-enabled map here: bulk_get_push_rules doesn't
- # fetch disabled rules, but this won't account for any server default
- # rules the user has disabled, so we need to do this too.
- for uid in user_ids:
- if uid not in rules_enabled_by_user:
- continue
-
- user_enabled_map = rules_enabled_by_user[uid]
-
- for i, rule in enumerate(rules_by_user[uid]):
- rule_id = rule['rule_id']
-
- if rule_id in user_enabled_map:
- if rule.get('enabled', True) != bool(user_enabled_map[rule_id]):
- # Rules are cached across users.
- rule = dict(rule)
- rule['enabled'] = bool(user_enabled_map[rule_id])
- rules_by_user[uid][i] = rule
+
+ rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
defer.returnValue(rules_by_user)
@defer.inlineCallbacks
-def evaluator_for_event(event, hs, store):
+def evaluator_for_event(event, hs, store, current_state):
room_id = event.room_id
-
- # users in the room who have pushers need to get push rules run because
- # that's how their pushers work
- users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
-
# We also will want to generate notifs for other people in the room so
# their unread countss are correct in the event stream, but to avoid
# generating them for bot / AS users etc, we only do so for people who've
# sent a read receipt into the room.
- all_in_room = yield store.get_users_in_room(room_id)
- all_in_room = set(all_in_room)
+ local_users_in_room = set(
+ e.state_key for e in current_state.values()
+ if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ and hs.is_mine_id(e.state_key)
+ )
+
+ # users in the room who have pushers need to get push rules run because
+ # that's how their pushers work
+ if_users_with_pushers = yield store.get_if_users_have_pushers(
+ local_users_in_room
+ )
+ user_ids = set(
+ uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
+ )
- receipts = yield store.get_receipts_for_room(room_id, "m.read")
+ users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
# any users with pushers must be ours: they have pushers
- user_ids = set(users_with_pushers)
- for r in receipts:
- if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room:
- user_ids.add(r['user_id'])
+ for uid in users_with_receipts:
+ if uid in local_users_in_room:
+ user_ids.add(uid)
# if this event is an invite event, we may need to run rules for the user
# who's been invited, otherwise they won't get told they've been invited
@@ -102,8 +74,6 @@ def evaluator_for_event(event, hs, store):
if has_pusher:
user_ids.add(invited_user)
- user_ids = list(user_ids)
-
rules_by_user = yield _get_rules(room_id, user_ids, store)
defer.returnValue(BulkPushRuleEvaluator(
@@ -141,7 +111,10 @@ class BulkPushRuleEvaluator:
self.store, user_tuples, [event], {event.event_id: current_state}
)
- room_members = yield self.store.get_users_in_room(self.room_id)
+ room_members = set(
+ e.state_key for e in current_state.values()
+ if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ )
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index ae9db9ec2f..e0331b2d2d 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -13,29 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.push.baserules import list_with_base_rules
-
from synapse.push.rulekinds import (
PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
)
import copy
-import simplejson as json
-def format_push_rules_for_user(user, rawrules, enabled_map):
+def format_push_rules_for_user(user, ruleslist):
"""Converts a list of rawrules and a enabled map into nested dictionaries
to match the Matrix client-server format for push rules"""
- ruleslist = []
- for rawrule in rawrules:
- rule = dict(rawrule)
- rule["conditions"] = json.loads(rawrule["conditions"])
- rule["actions"] = json.loads(rawrule["actions"])
- ruleslist.append(rule)
-
# We're going to be mutating this a lot, so do a deep copy
- ruleslist = copy.deepcopy(list_with_base_rules(ruleslist))
+ ruleslist = copy.deepcopy(ruleslist)
rules = {'global': {}, 'device': {}}
@@ -60,9 +50,7 @@ def format_push_rules_for_user(user, rawrules, enabled_map):
template_rule = _rule_to_template(r)
if template_rule:
- if r['rule_id'] in enabled_map:
- template_rule['enabled'] = enabled_map[r['rule_id']]
- elif 'enabled' in r:
+ if 'enabled' in r:
template_rule['enabled'] = r['enabled']
else:
template_rule['enabled'] = True
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index b4b728adc5..12a3ec7fd8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -32,12 +32,20 @@ DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000
# Each room maintains its own throttle counter, but each new mail notification
# sends the pending notifications for all rooms.
THROTTLE_START_MS = 10 * 60 * 1000
-THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # (2 * 60 * 1000) * (2 ** 11) # ~3 days
-THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours
+THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # 24h
+# THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours
+THROTTLE_MULTIPLIER = 144 # 10 mins, 24 hours - i.e. jump straight to 1 day
# If no event triggers a notification for this long after the previous,
# the throttle is released.
-THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days
+# 12 hours - a gap of 12 hours in conversation is surely enough to merit a new
+# notification when things get going again...
+THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
+
+# does each email include all unread notifs, or just the ones which have happened
+# since the last mail?
+# XXX: this is currently broken as it includes ones from parted rooms(!)
+INCLUDE_ALL_UNREAD_NOTIFS = False
class EmailPusher(object):
@@ -65,7 +73,12 @@ class EmailPusher(object):
self.processing = False
if self.hs.config.email_enable_notifs:
- self.mailer = Mailer(self.hs)
+ if 'data' in pusherdict and 'brand' in pusherdict['data']:
+ app_name = pusherdict['data']['brand']
+ else:
+ app_name = self.hs.config.email_app_name
+
+ self.mailer = Mailer(self.hs, app_name)
else:
self.mailer = None
@@ -126,8 +139,9 @@ class EmailPusher(object):
up logging, measures and guards against multiple instances of it
being run.
"""
+ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
- self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ self.user_id, start, self.max_stream_ordering
)
soonest_due_at = None
@@ -150,7 +164,6 @@ class EmailPusher(object):
# we then consider all previously outstanding notifications
# to be delivered.
- # debugging:
reason = {
'room_id': push_action['room_id'],
'now': self.clock.time_msec(),
@@ -165,9 +178,12 @@ class EmailPusher(object):
yield self.save_last_stream_ordering_and_success(max([
ea['stream_ordering'] for ea in unprocessed
]))
- yield self.sent_notif_update_throttle(
- push_action['room_id'], push_action
- )
+
+ # we update the throttle on all the possible unprocessed push actions
+ for ea in unprocessed:
+ yield self.sent_notif_update_throttle(
+ ea['room_id'], ea
+ )
break
else:
if soonest_due_at is None or should_notify_at < soonest_due_at:
@@ -263,5 +279,5 @@ class EmailPusher(object):
logger.info("Sending notif email for user %r", self.user_id)
yield self.mailer.send_notification_mail(
- self.user_id, self.email, push_actions, reason
+ self.app_id, self.user_id, self.email, push_actions, reason
)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index c2c2ca3fa7..e5c3929cd7 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -41,11 +41,14 @@ logger = logging.getLogger(__name__)
MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \
- "in the %s room..."
+ "in the %(room)s room..."
MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
-MESSAGES_IN_ROOM = "There are some messages on %(app)s for you in the %(room)s room..."
-MESSAGES_IN_ROOMS = "Here are some messages on %(app)s you may have missed..."
+MESSAGES_IN_ROOM = "You have messages on %(app)s in the %(room)s room..."
+MESSAGES_IN_ROOM_AND_OTHERS = \
+ "You have messages on %(app)s in the %(room)s room and others..."
+MESSAGES_FROM_PERSON_AND_OTHERS = \
+ "You have messages on %(app)s from %(person)s and others..."
INVITE_FROM_PERSON_TO_ROOM = "%(person)s has invited you to join the " \
"%(room)s room on %(app)s..."
INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
@@ -75,12 +78,14 @@ ALLOWED_ATTRS = {
class Mailer(object):
- def __init__(self, hs):
+ def __init__(self, hs, app_name):
self.hs = hs
self.store = self.hs.get_datastore()
+ self.auth_handler = self.hs.get_auth_handler()
self.state_handler = self.hs.get_state_handler()
loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
- self.app_name = self.hs.config.email_app_name
+ self.app_name = app_name
+ logger.info("Created Mailer for app_name %s" % app_name)
env = jinja2.Environment(loader=loader)
env.filters["format_ts"] = format_ts_filter
env.filters["mxc_to_http"] = self.mxc_to_http_filter
@@ -92,8 +97,16 @@ class Mailer(object):
)
@defer.inlineCallbacks
- def send_notification_mail(self, user_id, email_address, push_actions, reason):
- raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1]
+ def send_notification_mail(self, app_id, user_id, email_address,
+ push_actions, reason):
+ try:
+ from_string = self.hs.config.email_notif_from % {
+ "app": self.app_name
+ }
+ except TypeError:
+ from_string = self.hs.config.email_notif_from
+
+ raw_from = email.utils.parseaddr(from_string)[1]
raw_to = email.utils.parseaddr(email_address)[1]
if raw_to == '':
@@ -119,6 +132,8 @@ class Mailer(object):
user_display_name = yield self.store.get_profile_displayname(
UserID.from_string(user_id).localpart
)
+ if user_display_name is None:
+ user_display_name = user_id
except StoreError:
user_display_name = user_id
@@ -128,9 +143,14 @@ class Mailer(object):
state_by_room[room_id] = room_state
# Run at most 3 of these at once: sync does 10 at a time but email
- # notifs are much realtime than sync so we can afford to wait a bit.
+ # notifs are much less realtime than sync so we can afford to wait a bit.
yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
+ # actually sort our so-called rooms_in_order list, most recent room first
+ rooms_in_order.sort(
+ key=lambda r: -(notifs_by_room[r][-1]['received_ts'] or 0)
+ )
+
rooms = []
for r in rooms_in_order:
@@ -139,17 +159,19 @@ class Mailer(object):
)
rooms.append(roomvars)
- summary_text = self.make_summary_text(
- notifs_by_room, state_by_room, notif_events, user_id
+ reason['room_name'] = calculate_room_name(
+ state_by_room[reason['room_id']], user_id, fallback_to_members=True
)
- reason['room_name'] = calculate_room_name(
- state_by_room[reason['room_id']], user_id, fallback_to_members=False
+ summary_text = self.make_summary_text(
+ notifs_by_room, state_by_room, notif_events, user_id, reason
)
template_vars = {
"user_display_name": user_display_name,
- "unsubscribe_link": self.make_unsubscribe_link(),
+ "unsubscribe_link": self.make_unsubscribe_link(
+ user_id, app_id, email_address
+ ),
"summary_text": summary_text,
"app_name": self.app_name,
"rooms": rooms,
@@ -164,7 +186,7 @@ class Mailer(object):
multipart_msg = MIMEMultipart('alternative')
multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
- multipart_msg['From'] = self.hs.config.email_notif_from
+ multipart_msg['From'] = from_string
multipart_msg['To'] = email_address
multipart_msg['Date'] = email.utils.formatdate()
multipart_msg['Message-ID'] = email.utils.make_msgid()
@@ -251,7 +273,9 @@ class Mailer(object):
sender_state_event = room_state[("m.room.member", event.sender)]
sender_name = name_from_member_event(sender_state_event)
- sender_avatar_url = sender_state_event.content["avatar_url"]
+ sender_avatar_url = None
+ if "avatar_url" in sender_state_event.content:
+ sender_avatar_url = sender_state_event.content["avatar_url"]
# 'hash' for deterministically picking default images: use
# sender_hash % the number of default images to choose from
@@ -296,7 +320,8 @@ class Mailer(object):
return messagevars
- def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id):
+ def make_summary_text(self, notifs_by_room, state_by_room,
+ notif_events, user_id, reason):
if len(notifs_by_room) == 1:
# Only one room has new stuff
room_id = notifs_by_room.keys()[0]
@@ -371,9 +396,28 @@ class Mailer(object):
}
else:
# Stuff's happened in multiple different rooms
- return MESSAGES_IN_ROOMS % {
- "app": self.app_name,
- }
+
+ # ...but we still refer to the 'reason' room which triggered the mail
+ if reason['room_name'] is not None:
+ return MESSAGES_IN_ROOM_AND_OTHERS % {
+ "room": reason['room_name'],
+ "app": self.app_name,
+ }
+ else:
+ # If the reason room doesn't have a name, say who the messages
+ # are from explicitly to avoid, "messages in the Bob room"
+ sender_ids = list(set([
+ notif_events[n['event_id']].sender
+ for n in notifs_by_room[reason['room_id']]
+ ]))
+
+ return MESSAGES_FROM_PERSON_AND_OTHERS % {
+ "person": descriptor_from_member_events([
+ state_by_room[reason['room_id']][("m.room.member", s)]
+ for s in sender_ids
+ ]),
+ "app": self.app_name,
+ }
def make_room_link(self, room_id):
# need /beta for Universal Links to work on iOS
@@ -393,9 +437,18 @@ class Mailer(object):
notif['room_id'], notif['event_id']
)
- def make_unsubscribe_link(self):
- # XXX: matrix.to
- return "https://vector.im/#/settings"
+ def make_unsubscribe_link(self, user_id, app_id, email_address):
+ params = {
+ "access_token": self.auth_handler.generate_delete_pusher_token(user_id),
+ "app_id": app_id,
+ "pushkey": email_address,
+ }
+
+ # XXX: make r0 once API is stable
+ return "%s_matrix/client/unstable/pushers/remove?%s" % (
+ self.hs.config.public_baseurl,
+ urllib.urlencode(params),
+ )
def mxc_to_http_filter(self, value, width, height, resize_method="crop"):
if value[0:6] != "mxc://":
diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py
new file mode 100644
index 0000000000..fc18130ab4
--- /dev/null
+++ b/synapse/replication/presence_resource.py
@@ -0,0 +1,59 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import defer
+
+
+class PresenceResource(Resource):
+ """
+ HTTP endpoint for marking users as syncing.
+
+ POST /_synapse/replication/presence HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "process_id": "<process_id>",
+ "syncing_users": ["<user_id>"]
+ }
+ """
+
+ def __init__(self, hs):
+ Resource.__init__(self) # Resource is old-style, so no super()
+
+ self.version_string = hs.version_string
+ self.clock = hs.get_clock()
+ self.presence_handler = hs.get_presence_handler()
+
+ def render_POST(self, request):
+ self._async_render_POST(request)
+ return NOT_DONE_YET
+
+ @request_handler()
+ @defer.inlineCallbacks
+ def _async_render_POST(self, request):
+ content = parse_json_object_from_request(request)
+
+ process_id = content["process_id"]
+ syncing_user_ids = content["syncing_users"]
+
+ yield self.presence_handler.update_external_syncs(
+ process_id, set(syncing_user_ids)
+ )
+
+ respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 847f212a3d..8c2d487ff4 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -16,6 +16,7 @@
from synapse.http.servlet import parse_integer, parse_string
from synapse.http.server import request_handler, finish_request
from synapse.replication.pusher_resource import PusherResource
+from synapse.replication.presence_resource import PresenceResource
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
@@ -115,6 +116,7 @@ class ReplicationResource(Resource):
self.clock = hs.get_clock()
self.putChild("remove_pushers", PusherResource(hs))
+ self.putChild("syncing_users", PresenceResource(hs))
def render_GET(self, request):
self._async_render_GET(request)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index f59b0eabbc..735c03c7eb 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -15,7 +15,10 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
from synapse.storage.account_data import AccountDataStore
+from synapse.storage.tags import TagsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
class SlavedAccountDataStore(BaseSlavedStore):
@@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id",
)
+ self._account_data_stream_cache = StreamChangeCache(
+ "AccountDataAndTagsChangeCache",
+ self._account_data_id_gen.get_current_token(),
+ )
+
+ get_account_data_for_user = (
+ AccountDataStore.__dict__["get_account_data_for_user"]
+ )
get_global_account_data_by_type_for_users = (
AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
@@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore):
AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
)
+ get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+
+ get_updated_tags = DataStore.get_updated_tags.__func__
+ get_updated_account_data_for_user = (
+ DataStore.get_updated_account_data_for_user.__func__
+ )
+
+ def get_max_account_data_stream_id(self):
+ return self._account_data_id_gen.get_current_token()
+
def stream_positions(self):
result = super(SlavedAccountDataStore, self).stream_positions()
position = self._account_data_id_gen.get_current_token()
@@ -47,15 +68,33 @@ class SlavedAccountDataStore(BaseSlavedStore):
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
- user_id, data_type = row[1:3]
+ position, user_id, data_type = row[:3]
self.get_global_account_data_by_type_for_user.invalidate(
(data_type, user_id,)
)
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
stream = result.get("room_account_data")
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
stream = result.get("tag_account_data")
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_tags_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedAccountDataStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
new file mode 100644
index 0000000000..25792d9429
--- /dev/null
+++ b/synapse/replication/slave/storage/appservice.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.config.appservice import load_appservices
+
+
+class SlavedApplicationServiceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
+ self.services_cache = load_appservices(
+ hs.config.server_name,
+ hs.config.app_service_config_files
+ )
+
+ get_app_service_by_token = DataStore.get_app_service_by_token.__func__
+ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c0d741452d..877c68508c 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.state import StateStore
+from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
import ujson as json
@@ -57,6 +58,9 @@ class SlavedEventStore(BaseSlavedStore):
"EventsRoomStreamChangeCache", min_event_val,
prefilled_cache=event_cache_prefill,
)
+ self._membership_stream_cache = StreamChangeCache(
+ "MembershipStreamChangeCache", events_max,
+ )
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
@@ -87,6 +91,9 @@ class SlavedEventStore(BaseSlavedStore):
_get_state_group_from_group = (
StateStore.__dict__["_get_state_group_from_group"]
)
+ get_recent_event_ids_for_room = (
+ StreamStore.__dict__["get_recent_event_ids_for_room"]
+ )
get_unread_push_actions_for_user_in_range = (
DataStore.get_unread_push_actions_for_user_in_range.__func__
@@ -109,24 +116,25 @@ class SlavedEventStore(BaseSlavedStore):
DataStore.get_room_events_stream_for_room.__func__
)
get_events_around = DataStore.get_events_around.__func__
+ get_state_for_event = DataStore.get_state_for_event.__func__
get_state_for_events = DataStore.get_state_for_events.__func__
get_state_groups = DataStore.get_state_groups.__func__
+ get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
+ get_room_events_stream_for_rooms = (
+ DataStore.get_room_events_stream_for_rooms.__func__
+ )
+ get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
- _set_before_and_after = DataStore._set_before_and_after
+ _set_before_and_after = staticmethod(DataStore._set_before_and_after)
_get_events = DataStore._get_events.__func__
_get_events_from_cache = DataStore._get_events_from_cache.__func__
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
- _parse_events_txn = DataStore._parse_events_txn.__func__
- _get_events_txn = DataStore._get_events_txn.__func__
- _get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
- _fetch_events_txn = DataStore._fetch_events_txn.__func__
_fetch_event_rows = DataStore._fetch_event_rows.__func__
_get_event_from_row = DataStore._get_event_from_row.__func__
- _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
@@ -220,9 +228,9 @@ class SlavedEventStore(BaseSlavedStore):
self.get_rooms_for_user.invalidate((event.state_key,))
# self.get_joined_hosts_for_room.invalidate((event.room_id,))
self.get_users_in_room.invalidate((event.room_id,))
- # self._membership_stream_cache.entity_has_changed(
- # event.state_key, event.internal_metadata.stream_ordering
- # )
+ self._membership_stream_cache.entity_has_changed(
+ event.state_key, event.internal_metadata.stream_ordering
+ )
self.get_invited_rooms_for_user.invalidate((event.state_key,))
if not event.is_state():
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
new file mode 100644
index 0000000000..819ed62881
--- /dev/null
+++ b/synapse/replication/slave/storage/filtering.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.filtering import FilteringStore
+
+
+class SlavedFilteringStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedFilteringStore, self).__init__(db_conn, hs)
+
+ # Filters are immutable so this cache doesn't need to be expired
+ get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
new file mode 100644
index 0000000000..703f4a49bf
--- /dev/null
+++ b/synapse/replication/slave/storage/presence.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage import DataStore
+
+
+class SlavedPresenceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPresenceStore, self).__init__(db_conn, hs)
+ self._presence_id_gen = SlavedIdTracker(
+ db_conn, "presence_stream", "stream_id",
+ )
+
+ self._presence_on_startup = self._get_active_presence(db_conn)
+
+ self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
+ "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
+ )
+
+ _get_active_presence = DataStore._get_active_presence.__func__
+ take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+ get_presence_for_users = DataStore.get_presence_for_users.__func__
+
+ def get_current_presence_token(self):
+ return self._presence_id_gen.get_current_token()
+
+ def stream_positions(self):
+ result = super(SlavedPresenceStore, self).stream_positions()
+ position = self._presence_id_gen.get_current_token()
+ result["presence"] = position
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("presence")
+ if stream:
+ self._presence_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.presence_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
new file mode 100644
index 0000000000..21ceb0213a
--- /dev/null
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .events import SlavedEventStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.storage.push_rule import PushRuleStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedPushRuleStore(SlavedEventStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPushRuleStore, self).__init__(db_conn, hs)
+ self._push_rules_stream_id_gen = SlavedIdTracker(
+ db_conn, "push_rules_stream", "stream_id",
+ )
+ self.push_rules_stream_cache = StreamChangeCache(
+ "PushRulesStreamChangeCache",
+ self._push_rules_stream_id_gen.get_current_token(),
+ )
+
+ get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
+ get_push_rules_enabled_for_user = (
+ PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
+ )
+ have_push_rules_changed_for_user = (
+ DataStore.have_push_rules_changed_for_user.__func__
+ )
+
+ def get_push_rules_stream_token(self):
+ return (
+ self._push_rules_stream_id_gen.get_current_token(),
+ self._stream_id_gen.get_current_token(),
+ )
+
+ def stream_positions(self):
+ result = super(SlavedPushRuleStore, self).stream_positions()
+ result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("push_rules")
+ if stream:
+ for row in stream["rows"]:
+ position = row[0]
+ user_id = row[2]
+ self.get_push_rules_for_user.invalidate((user_id,))
+ self.get_push_rules_enabled_for_user.invalidate((user_id,))
+ self.push_rules_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ self._push_rules_stream_id_gen.advance(int(stream["position"]))
+
+ return super(SlavedPushRuleStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index ec007516d0..ac9662d399 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.storage.receipts import ReceiptsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
@@ -37,11 +38,28 @@ class SlavedReceiptsStore(BaseSlavedStore):
db_conn, "receipts_linearized", "stream_id"
)
+ self._receipts_stream_cache = StreamChangeCache(
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
+ )
+
get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
+ get_linearized_receipts_for_room = (
+ ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
+ )
+ _get_linearized_receipts_for_rooms = (
+ ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
+ )
+ get_last_receipt_event_id_for_user = (
+ ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
+ )
get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+ get_linearized_receipts_for_rooms = (
+ DataStore.get_linearized_receipts_for_rooms.__func__
+ )
+
def stream_positions(self):
result = super(SlavedReceiptsStore, self).stream_positions()
result["receipts"] = self._receipts_id_gen.get_current_token()
@@ -52,10 +70,15 @@ class SlavedReceiptsStore(BaseSlavedStore):
if stream:
self._receipts_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
- room_id, receipt_type, user_id = row[1:4]
+ position, room_id, receipt_type, user_id = row[:4]
self.invalidate_caches_for_receipt(room_id, receipt_type, user_id)
+ self._receipts_stream_cache.entity_has_changed(room_id, position)
return super(SlavedReceiptsStore, self).process_replication(result)
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
+ self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+ self.get_last_receipt_event_id_for_user.invalidate(
+ (user_id, room_id, receipt_type)
+ )
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
new file mode 100644
index 0000000000..307833f9e1
--- /dev/null
+++ b/synapse/replication/slave/storage/registration.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.registration import RegistrationStore
+
+
+class SlavedRegistrationStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedRegistrationStore, self).__init__(db_conn, hs)
+
+ # TODO: use the cached version and invalidate deleted tokens
+ get_user_by_access_token = RegistrationStore.__dict__[
+ "get_user_by_access_token"
+ ].orig
+
+ _query_for_auth = DataStore._query_for_auth.__func__
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index d1afa0f0d5..498bb9e18a 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -45,30 +45,27 @@ class EventStreamRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Guest users must specify room_id param")
if "room_id" in request.args:
room_id = request.args["room_id"][0]
- try:
- handler = self.handlers.event_stream_handler
- pagin_config = PaginationConfig.from_request(request)
- timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
- if "timeout" in request.args:
- try:
- timeout = int(request.args["timeout"][0])
- except ValueError:
- raise SynapseError(400, "timeout must be in milliseconds.")
-
- as_client_event = "raw" not in request.args
-
- chunk = yield handler.get_stream(
- requester.user.to_string(),
- pagin_config,
- timeout=timeout,
- as_client_event=as_client_event,
- affect_presence=(not is_guest),
- room_id=room_id,
- is_guest=is_guest,
- )
- except:
- logger.exception("Event stream failed")
- raise
+
+ handler = self.handlers.event_stream_handler
+ pagin_config = PaginationConfig.from_request(request)
+ timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
+ if "timeout" in request.args:
+ try:
+ timeout = int(request.args["timeout"][0])
+ except ValueError:
+ raise SynapseError(400, "timeout must be in milliseconds.")
+
+ as_client_event = "raw" not in request.args
+
+ chunk = yield handler.get_stream(
+ requester.user.to_string(),
+ pagin_config,
+ timeout=timeout,
+ as_client_event=as_client_event,
+ affect_presence=(not is_guest),
+ room_id=room_id,
+ is_guest=is_guest,
+ )
defer.returnValue((200, chunk))
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 3b5544851b..8df9d10efa 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -58,6 +58,7 @@ class LoginRestServlet(ClientV1RestServlet):
self.cas_required_attributes = hs.config.cas_required_attributes
self.servername = hs.config.server_name
self.http_client = hs.get_simple_http_client()
+ self.auth_handler = self.hs.get_auth_handler()
def on_GET(self, request):
flows = []
@@ -143,7 +144,7 @@ class LoginRestServlet(ClientV1RestServlet):
user_id, self.hs.hostname
).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_id, access_token, refresh_token = yield auth_handler.login_with_password(
user_id=user_id,
password=login_submission["password"])
@@ -160,7 +161,7 @@ class LoginRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def do_token_login(self, login_submission):
token = login_submission['token']
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_id = (
yield auth_handler.validate_short_term_login_token_and_get_user_id(token)
)
@@ -194,7 +195,7 @@ class LoginRestServlet(ClientV1RestServlet):
raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
user_id = UserID.create(user, self.hs.hostname).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_exists = yield auth_handler.does_user_exist(user_id)
if user_exists:
user_id, access_token, refresh_token = (
@@ -243,7 +244,7 @@ class LoginRestServlet(ClientV1RestServlet):
raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
user_id = UserID.create(user, self.hs.hostname).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_exists = yield auth_handler.does_user_exist(user_id)
if user_exists:
user_id, access_token, refresh_token = (
@@ -412,7 +413,7 @@ class CasTicketServlet(ClientV1RestServlet):
raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
user_id = UserID.create(user, self.hs.hostname).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_exists = yield auth_handler.does_user_exist(user_id)
if not user_exists:
user_id, _ = (
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index 02d837ee6a..6bb4821ec6 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -128,11 +128,9 @@ class PushRuleRestServlet(ClientV1RestServlet):
# we build up the full structure and then decide which bits of it
# to send which means doing unnecessary work sometimes but is
# is probably not going to make a whole lot of difference
- rawrules = yield self.store.get_push_rules_for_user(user_id)
+ rules = yield self.store.get_push_rules_for_user(user_id)
- enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
-
- rules = format_push_rules_for_user(requester.user, rawrules, enabled_map)
+ rules = format_push_rules_for_user(requester.user, rules)
path = request.postpath[1:]
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index ab928a16da..9a2ed6ed88 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -17,7 +17,11 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, Codes
from synapse.push import PusherConfigException
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import (
+ parse_json_object_from_request, parse_string, RestServlet
+)
+from synapse.http.server import finish_request
+from synapse.api.errors import StoreError
from .base import ClientV1RestServlet, client_path_patterns
@@ -136,6 +140,57 @@ class PushersSetRestServlet(ClientV1RestServlet):
return 200, {}
+class PushersRemoveRestServlet(RestServlet):
+ """
+ To allow pusher to be delete by clicking a link (ie. GET request)
+ """
+ PATTERNS = client_path_patterns("/pushers/remove$")
+ SUCCESS_HTML = "<html><body>You have been unsubscribed</body><html>"
+
+ def __init__(self, hs):
+ super(RestServlet, self).__init__()
+ self.hs = hs
+ self.notifier = hs.get_notifier()
+ self.auth = hs.get_v1auth()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ requester = yield self.auth.get_user_by_req(request, rights="delete_pusher")
+ user = requester.user
+
+ app_id = parse_string(request, "app_id", required=True)
+ pushkey = parse_string(request, "pushkey", required=True)
+
+ pusher_pool = self.hs.get_pusherpool()
+
+ try:
+ yield pusher_pool.remove_pusher(
+ app_id=app_id,
+ pushkey=pushkey,
+ user_id=user.to_string(),
+ )
+ except StoreError as se:
+ if se.code != 404:
+ # This is fine: they're already unsubscribed
+ raise
+
+ self.notifier.on_new_replication_data()
+
+ request.setResponseCode(200)
+ request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+ request.setHeader(b"Server", self.hs.version_string)
+ request.setHeader(b"Content-Length", b"%d" % (
+ len(PushersRemoveRestServlet.SUCCESS_HTML),
+ ))
+ request.write(PushersRemoveRestServlet.SUCCESS_HTML)
+ finish_request(request)
+ defer.returnValue(None)
+
+ def on_OPTIONS(self, _):
+ return 200, {}
+
+
def register_servlets(hs, http_server):
PushersRestServlet(hs).register(http_server)
PushersSetRestServlet(hs).register(http_server)
+ PushersRemoveRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 644aa4e513..86fbe2747d 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -72,8 +72,6 @@ class RoomCreateRestServlet(ClientV1RestServlet):
def get_room_config(self, request):
user_supplied_config = parse_json_object_from_request(request)
- # default visibility
- user_supplied_config.setdefault("visibility", "public")
return user_supplied_config
def on_OPTIONS(self, request):
@@ -279,8 +277,16 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
- handler = self.handlers.room_list_handler
- data = yield handler.get_public_room_list()
+ try:
+ yield self.auth.get_user_by_req(request)
+ except AuthError:
+ # This endpoint isn't authed, but its useful to know who's hitting
+ # it if they *do* supply an access token
+ pass
+
+ handler = self.hs.get_room_list_handler()
+ data = yield handler.get_aggregated_public_room_list()
+
defer.returnValue((200, data))
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index c88c270537..9a84873a5f 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -35,7 +35,7 @@ class PasswordRestServlet(RestServlet):
super(PasswordRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
@defer.inlineCallbacks
def on_POST(self, request):
@@ -97,7 +97,7 @@ class ThreepidRestServlet(RestServlet):
self.hs = hs
self.identity_handler = hs.get_handlers().identity_handler
self.auth = hs.get_auth()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
@defer.inlineCallbacks
def on_GET(self, request):
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 78181b7b18..58d3cad6a1 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -104,7 +104,7 @@ class AuthRestServlet(RestServlet):
super(AuthRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
self.registration_handler = hs.get_handlers().registration_handler
@defer.inlineCallbacks
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 1ecc02d94d..2088c316d1 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -49,7 +49,7 @@ class RegisterRestServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
self.registration_handler = hs.get_handlers().registration_handler
self.identity_handler = hs.get_handlers().identity_handler
diff --git a/synapse/rest/client/v2_alpha/tokenrefresh.py b/synapse/rest/client/v2_alpha/tokenrefresh.py
index a158c2209a..8270e8787f 100644
--- a/synapse/rest/client/v2_alpha/tokenrefresh.py
+++ b/synapse/rest/client/v2_alpha/tokenrefresh.py
@@ -38,7 +38,7 @@ class TokenRefreshRestServlet(RestServlet):
body = parse_json_object_from_request(request)
try:
old_refresh_token = body["refresh_token"]
- auth_handler = self.hs.get_handlers().auth_handler
+ auth_handler = self.hs.get_auth_handler()
(user_id, new_refresh_token) = yield self.store.exchange_refresh_token(
old_refresh_token, auth_handler.generate_refresh_token)
new_access_token = yield auth_handler.issue_access_token(user_id)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index d96bf9afe2..2468c3ac42 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -26,6 +26,7 @@ from .thumbnailer import Thumbnailer
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.util.stringutils import random_string
+from synapse.api.errors import SynapseError
from twisted.internet import defer, threads
@@ -134,10 +135,15 @@ class MediaRepository(object):
request_path = "/".join((
"/_matrix/media/v1/download", server_name, media_id,
))
- length, headers = yield self.client.get_file(
- server_name, request_path, output_stream=f,
- max_size=self.max_upload_size,
- )
+ try:
+ length, headers = yield self.client.get_file(
+ server_name, request_path, output_stream=f,
+ max_size=self.max_upload_size,
+ )
+ except Exception as e:
+ logger.warn("Failed to fetch remoted media %r", e)
+ raise SynapseError(502, "Failed to fetch remoted media")
+
media_type = headers["Content-Type"][0]
time_now_ms = self.clock.time_msec()
diff --git a/synapse/server.py b/synapse/server.py
index 01f828819f..dd4b81c658 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -22,6 +22,8 @@
from twisted.web.client import BrowserLikePolicyForHTTPS
from twisted.enterprise import adbapi
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.appservice.api import ApplicationServiceApi
from synapse.federation import initialize_http_replication
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
from synapse.notifier import Notifier
@@ -30,6 +32,9 @@ from synapse.handlers import Handlers
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
+from synapse.handlers.room import RoomListHandler
+from synapse.handlers.auth import AuthHandler
+from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.state import StateHandler
from synapse.storage import DataStore
from synapse.util import Clock
@@ -84,6 +89,11 @@ class HomeServer(object):
'presence_handler',
'sync_handler',
'typing_handler',
+ 'room_list_handler',
+ 'auth_handler',
+ 'application_service_api',
+ 'application_service_scheduler',
+ 'application_service_handler',
'notifier',
'distributor',
'client_resource',
@@ -179,6 +189,21 @@ class HomeServer(object):
def build_sync_handler(self):
return SyncHandler(self)
+ def build_room_list_handler(self):
+ return RoomListHandler(self)
+
+ def build_auth_handler(self):
+ return AuthHandler(self)
+
+ def build_application_service_api(self):
+ return ApplicationServiceApi(self)
+
+ def build_application_service_scheduler(self):
+ return ApplicationServiceScheduler(self)
+
+ def build_application_service_handler(self):
+ return ApplicationServicesHandler(self)
+
def build_event_sources(self):
return EventSources(self)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d970fde9e8..e93c3de66c 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
-from ._base import Cache
+from ._base import LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@@ -45,6 +45,7 @@ from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
from .openid import OpenIdStore
+from .client_ips import ClientIpStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
@@ -58,12 +59,6 @@ import logging
logger = logging.getLogger(__name__)
-# Number of msec of granularity to store the user IP 'last seen' time. Smaller
-# times give more inserts into the database even for readonly API hits
-# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
-
-
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore,
@@ -84,17 +79,14 @@ class DataStore(RoomMemberStore, RoomStore,
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
+ ClientIpStore,
):
def __init__(self, db_conn, hs):
self.hs = hs
+ self._clock = hs.get_clock()
self.database_engine = hs.database_engine
- self.client_ip_last_seen = Cache(
- name="client_ip_last_seen",
- keylen=4,
- )
-
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering",
extra_tables=[("local_invites", "stream_id")]
@@ -148,7 +140,7 @@ class DataStore(RoomMemberStore, RoomStore,
"AccountDataAndTagsChangeCache", account_max,
)
- self.__presence_on_startup = self._get_active_presence(db_conn)
+ self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict(
db_conn, "presence_stream",
@@ -173,11 +165,24 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=push_rules_prefill,
)
+ cur = LoggingTransaction(
+ db_conn.cursor(),
+ name="_find_stream_orderings_for_times_txn",
+ database_engine=self.database_engine,
+ after_callbacks=[]
+ )
+ self._find_stream_orderings_for_times_txn(cur)
+ cur.close()
+
+ self.find_stream_orderings_looping_call = self._clock.looping_call(
+ self._find_stream_orderings_for_times, 60 * 60 * 1000
+ )
+
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):
- active_on_startup = self.__presence_on_startup
- self.__presence_on_startup = None
+ active_on_startup = self._presence_on_startup
+ self._presence_on_startup = None
return active_on_startup
def _get_active_presence(self, db_conn):
@@ -203,39 +208,6 @@ class DataStore(RoomMemberStore, RoomStore,
return [UserPresenceState(**row) for row in rows]
@defer.inlineCallbacks
- def insert_client_ip(self, user, access_token, ip, user_agent):
- now = int(self._clock.time_msec())
- key = (user.to_string(), access_token, ip)
-
- try:
- last_seen = self.client_ip_last_seen.get(key)
- except KeyError:
- last_seen = None
-
- # Rate-limited inserts
- if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
- defer.returnValue(None)
-
- self.client_ip_last_seen.prefill(key, now)
-
- # It's safe not to lock here: a) no unique constraint,
- # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
- yield self._simple_upsert(
- "user_ips",
- keyvalues={
- "user_id": user.to_string(),
- "access_token": access_token,
- "ip": ip,
- "user_agent": user_agent,
- },
- values={
- "last_seen": now,
- },
- desc="insert_client_ip",
- lock=False,
- )
-
- @defer.inlineCallbacks
def count_daily_users(self):
"""
Counts the number of users who used this homeserver in the last 24 hours.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e0d7098692..32c6677d47 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -152,8 +152,8 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
- self._db_pool = hs.get_db_pool()
self._clock = hs.get_clock()
+ self._db_pool = hs.get_db_pool()
self._previous_txn_total_time = 0
self._current_txn_total_time = 0
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index ec7e8d40d2..3fa226e92d 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -138,6 +138,9 @@ class AccountDataStore(SQLBaseStore):
A deferred pair of lists of tuples of stream_id int, user_id string,
room_id string, type string, and content string.
"""
+ if last_room_id == current_id and last_global_id == current_id:
+ return defer.succeed(([], []))
+
def get_updated_account_data_txn(txn):
sql = (
"SELECT stream_id, user_id, account_data_type, content"
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index feb9d228ae..d1ee533fac 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
dict(txn_id=txn_id, as_id=service.id)
)
+ @defer.inlineCallbacks
def get_oldest_unsent_txn(self, service):
"""Get the oldest transaction which has not been sent for this
service.
@@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
A Deferred which resolves to an AppServiceTransaction or
None.
"""
- return self.runInteraction(
+ entry = yield self.runInteraction(
"get_oldest_unsent_appservice_txn",
self._get_oldest_unsent_txn,
service
)
+ if not entry:
+ defer.returnValue(None)
+
+ event_ids = json.loads(entry["event_ids"])
+
+ events = yield self._get_events(event_ids)
+
+ defer.returnValue(AppServiceTransaction(
+ service=service, id=entry["txn_id"], events=events
+ ))
+
def _get_oldest_unsent_txn(self, txn, service):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
@@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
entry = rows[0]
- event_ids = json.loads(entry["event_ids"])
- events = self._get_events_txn(txn, event_ids)
-
- return AppServiceTransaction(
- service=service, id=entry["txn_id"], events=events
- )
+ return entry
def _get_last_txn(self, txn, service_id):
txn.execute(
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
new file mode 100644
index 0000000000..a90990e006
--- /dev/null
+++ b/synapse/storage/client_ips.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore, Cache
+
+from twisted.internet import defer
+
+
+# Number of msec of granularity to store the user IP 'last seen' time. Smaller
+# times give more inserts into the database even for readonly API hits
+# 120 seconds == 2 minutes
+LAST_SEEN_GRANULARITY = 120 * 1000
+
+
+class ClientIpStore(SQLBaseStore):
+
+ def __init__(self, hs):
+ self.client_ip_last_seen = Cache(
+ name="client_ip_last_seen",
+ keylen=4,
+ )
+
+ super(ClientIpStore, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def insert_client_ip(self, user, access_token, ip, user_agent):
+ now = int(self._clock.time_msec())
+ key = (user.to_string(), access_token, ip)
+
+ try:
+ last_seen = self.client_ip_last_seen.get(key)
+ except KeyError:
+ last_seen = None
+
+ # Rate-limited inserts
+ if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+ defer.returnValue(None)
+
+ self.client_ip_last_seen.prefill(key, now)
+
+ # It's safe not to lock here: a) no unique constraint,
+ # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
+ yield self._simple_upsert(
+ "user_ips",
+ keyvalues={
+ "user_id": user.to_string(),
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ },
+ values={
+ "last_seen": now,
+ },
+ desc="insert_client_ip",
+ lock=False,
+ )
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9705db5c47..940e11d7a2 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
+ def __init__(self, hs):
+ self.stream_ordering_month_ago = None
+ super(EventPushActionsStore, self).__init__(hs)
+
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@@ -115,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id,
min_stream_ordering,
- max_stream_ordering=None):
+ max_stream_ordering=None,
+ limit=20):
def get_after_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
@@ -147,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering)
- sql += " ORDER BY ep.stream_ordering ASC"
+ sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+ args.append(limit)
txn.execute(sql, args)
return txn.fetchall()
after_read_receipt = yield self.runInteraction(
@@ -224,18 +230,93 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
- def _remove_push_actions_before_txn(self, txn, room_id, user_id,
- topological_ordering):
+ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+ topological_ordering):
+ """
+ Purges old, stale push actions for a user and room before a given
+ topological_ordering
+ Args:
+ txn: The transcation
+ room_id: Room ID to delete from
+ user_id: user ID to delete for
+ topological_ordering: The lowest topological ordering which will
+ not be deleted.
+ """
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id, )
)
+
+ # We need to join on the events table to get the received_ts for
+ # event_push_actions and sqlite won't let us use a join in a delete so
+ # we can't just delete where received_ts < x. Furthermore we can
+ # only identify event_push_actions by a tuple of room_id, event_id
+ # we we can't use a subquery.
+ # Instead, we look up the stream ordering for the last event in that
+ # room received before the threshold time and delete event_push_actions
+ # in the room with a stream_odering before that.
txn.execute(
- "DELETE FROM event_push_actions"
- " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
- (room_id, user_id, topological_ordering,)
+ "DELETE FROM event_push_actions "
+ " WHERE user_id = ? AND room_id = ? AND "
+ " topological_ordering < ? AND stream_ordering < ?",
+ (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+ )
+
+ @defer.inlineCallbacks
+ def _find_stream_orderings_for_times(self):
+ yield self.runInteraction(
+ "_find_stream_orderings_for_times",
+ self._find_stream_orderings_for_times_txn
+ )
+
+ def _find_stream_orderings_for_times_txn(self, txn):
+ logger.info("Searching for stream ordering 1 month ago")
+ self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 month ago: it's %d",
+ self.stream_ordering_month_ago
)
+ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+ """
+ Find the stream_ordering of the first event that was received after
+ a given timestamp. This is relatively slow as there is no index on
+ received_ts but we can then use this to delete push actions before
+ this.
+
+ received_ts must necessarily be in the same order as stream_ordering
+ and stream_ordering is indexed, so we manually binary search using
+ stream_ordering
+ """
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ max_stream_ordering = txn.fetchone()[0]
+
+ if max_stream_ordering is None:
+ return 0
+
+ range_start = 0
+ range_end = max_stream_ordering
+
+ sql = (
+ "SELECT received_ts FROM events"
+ " WHERE stream_ordering > ?"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ )
+
+ while range_end - range_start > 1:
+ middle = int((range_end + range_start) / 2)
+ txn.execute(sql, (middle,))
+ middle_ts = txn.fetchone()[0]
+ if ts > middle_ts:
+ range_start = middle
+ else:
+ range_end = middle
+
+ return range_end
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4655669ba0..6d978ffcd5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
from collections import deque, namedtuple
+import synapse
+import synapse.metrics
+
import logging
import math
@@ -35,6 +38,10 @@ import ujson as json
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
def encode_json(json_object):
if USE_FROZEN_DICTS:
# ujson doesn't like frozen_dicts
@@ -139,6 +146,9 @@ class _EventPeristenceQueue(object):
pass
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
@@ -258,6 +268,7 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk,
backfilled=backfilled,
)
+ persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
@log_function
@@ -275,6 +286,7 @@ class EventsStore(SQLBaseStore):
current_state=current_state,
backfilled=backfilled,
)
+ persist_event_counter.inc()
except _RollbackButIsFineException:
pass
@@ -342,9 +354,6 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
- txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
@@ -635,6 +644,8 @@ class EventsStore(SQLBaseStore):
],
)
+ self._add_to_cache(txn, events_and_contexts)
+
if backfilled:
# Backfilled events come before the current state so we don't need
# to update the current state table
@@ -676,6 +687,45 @@ class EventsStore(SQLBaseStore):
return
+ def _add_to_cache(self, txn, events_and_contexts):
+ to_prefill = []
+
+ rows = []
+ N = 200
+ for i in range(0, len(events_and_contexts), N):
+ ev_map = {
+ e[0].event_id: e[0]
+ for e in events_and_contexts[i:i + N]
+ }
+ if not ev_map:
+ break
+
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE e.event_id IN (%s)"
+ ) % (",".join(["?"] * len(ev_map)),)
+
+ txn.execute(sql, ev_map.keys())
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(
+ event=event,
+ redacted_event=None,
+ ))
+
+ def prefill():
+ for cache_entry in to_prefill:
+ self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+ txn.call_after(prefill)
+
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
@@ -741,100 +791,65 @@ class EventsStore(SQLBaseStore):
event_id_list = event_ids
event_ids = set(event_ids)
- event_map = self._get_events_from_cache(
+ event_entry_map = self._get_events_from_cache(
event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
- event_map.update(missing_events)
-
- defer.returnValue([
- event_map[e_id] for e_id in event_id_list
- if e_id in event_map and event_map[e_id]
- ])
-
- def _get_events_txn(self, txn, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- return []
-
- event_map = self._get_events_from_cache(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ event_entry_map.update(missing_events)
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ events = []
+ for event_id in event_id_list:
+ entry = event_entry_map.get(event_id, None)
+ if not entry:
+ continue
- if not missing_events_ids:
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ if allow_rejected or not entry.event.rejected_reason:
+ if check_redacted and entry.redacted_event:
+ event = entry.redacted_event
+ else:
+ event = entry.event
- missing_events = self._fetch_events_txn(
- txn,
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ events.append(event)
- event_map.update(missing_events)
+ if get_prev_content:
+ if "replaces_state" in event.unsigned:
+ prev = yield self.get_event(
+ event.unsigned["replaces_state"],
+ get_prev_content=False,
+ allow_none=True,
+ )
+ if prev:
+ event.unsigned = dict(event.unsigned)
+ event.unsigned["prev_content"] = prev.content
+ event.unsigned["prev_sender"] = prev.sender
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
- for check_redacted in (False, True):
- for get_prev_content in (False, True):
- self._get_event_cache.invalidate(
- (event_id, check_redacted, get_prev_content)
- )
-
- def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
-
- events = self._get_events_txn(
- txn, [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ self._get_event_cache.invalidate((event_id,))
- return events[0] if events else None
-
- def _get_events_from_cache(self, events, check_redacted, get_prev_content,
- allow_rejected):
+ def _get_events_from_cache(self, events, allow_rejected):
event_map = {}
for event_id in events:
- try:
- ret = self._get_event_cache.get(
- (event_id, check_redacted, get_prev_content,)
- )
+ ret = self._get_event_cache.get((event_id,), None)
+ if not ret:
+ continue
- if allow_rejected or not ret.rejected_reason:
- event_map[event_id] = ret
- else:
- event_map[event_id] = None
- except KeyError:
- pass
+ if allow_rejected or not ret.event.rejected_reason:
+ event_map[event_id] = ret
+ else:
+ event_map[event_id] = None
return event_map
@@ -905,8 +920,7 @@ class EventsStore(SQLBaseStore):
reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@@ -944,8 +958,6 @@ class EventsStore(SQLBaseStore):
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
@@ -954,7 +966,7 @@ class EventsStore(SQLBaseStore):
)
defer.returnValue({
- e.event_id: e
+ e.event.event_id: e
for e in res if e
})
@@ -984,37 +996,8 @@ class EventsStore(SQLBaseStore):
return rows
- def _fetch_events_txn(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not events:
- return {}
-
- rows = self._fetch_event_rows(
- txn, events,
- )
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = [
- self._get_event_from_row_txn(
- txn,
- row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- rejected_reason=row["rejects"],
- )
- for row in rows
- ]
-
- return {
- r.event_id: r
- for r in res
- }
-
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -1024,26 +1007,27 @@ class EventsStore(SQLBaseStore):
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_rejected_reason",
)
- ev = FrozenEvent(
+ original_ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
- if check_redacted and redacted:
- ev = prune_event(ev)
+ redacted_event = None
+ if redacted:
+ redacted_event = prune_event(original_ev)
redaction_id = yield self._simple_select_one_onecol(
table="redactions",
- keyvalues={"redacts": ev.event_id},
+ keyvalues={"redacts": redacted_event.event_id},
retcol="event_id",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_redactions",
)
- ev.unsigned["redacted_by"] = redaction_id
+ redacted_event.unsigned["redacted_by"] = redaction_id
# Get the redaction event.
because = yield self.get_event(
@@ -1055,86 +1039,16 @@ class EventsStore(SQLBaseStore):
if because:
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
- ev.unsigned["redacted_because"] = because
+ redacted_event.unsigned["redacted_because"] = because
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = yield self.get_event(
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
+ cache_entry = _EventCacheEntry(
+ event=original_ev,
+ redacted_event=redacted_event,
)
- defer.returnValue(ev)
-
- def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
- rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = self._simple_select_one_onecol_txn(
- txn,
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- )
-
- ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- if check_redacted and redacted:
- ev = prune_event(ev)
-
- redaction_id = self._simple_select_one_onecol_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": ev.event_id},
- retcol="event_id",
- )
-
- ev.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = self._get_event_txn(
- txn,
- redaction_id,
- check_redacted=False
- )
-
- if because:
- ev.unsigned["redacted_because"] = because
-
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = self._get_event_txn(
- txn,
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- return ev
-
- def _parse_events_txn(self, txn, rows):
- event_ids = [r["event_id"] for r in rows]
+ self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
- return self._get_events_txn(txn, event_ids)
+ defer.returnValue(cache_entry)
@defer.inlineCallbacks
def count_daily_messages(self):
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 3fab57a7e8..d03f7c541e 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -118,6 +118,9 @@ class PresenceStore(SQLBaseStore):
)
def get_all_presence_updates(self, last_id, current_id):
+ if last_id == current_id:
+ return defer.succeed([])
+
def get_all_presence_updates_txn(txn):
sql = (
"SELECT stream_id, user_id, state, last_active_ts,"
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d2bf7f2aec..8183b7f1b0 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -14,7 +14,8 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+from synapse.push.baserules import list_with_base_rules
from twisted.internet import defer
import logging
@@ -23,8 +24,31 @@ import simplejson as json
logger = logging.getLogger(__name__)
+def _load_rules(rawrules, enabled_map):
+ ruleslist = []
+ for rawrule in rawrules:
+ rule = dict(rawrule)
+ rule["conditions"] = json.loads(rawrule["conditions"])
+ rule["actions"] = json.loads(rawrule["actions"])
+ ruleslist.append(rule)
+
+ # We're going to be mutating this a lot, so do a deep copy
+ rules = list(list_with_base_rules(ruleslist))
+
+ for i, rule in enumerate(rules):
+ rule_id = rule['rule_id']
+ if rule_id in enabled_map:
+ if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+ # Rules are cached across users.
+ rule = dict(rule)
+ rule['enabled'] = bool(enabled_map[rule_id])
+ rules[i] = rule
+
+ return rules
+
+
class PushRuleStore(SQLBaseStore):
- @cachedInlineCallbacks()
+ @cachedInlineCallbacks(lru=True)
def get_push_rules_for_user(self, user_id):
rows = yield self._simple_select_list(
table="push_rules",
@@ -42,9 +66,13 @@ class PushRuleStore(SQLBaseStore):
key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
)
- defer.returnValue(rows)
+ enabled_map = yield self.get_push_rules_enabled_for_user(user_id)
- @cachedInlineCallbacks()
+ rules = _load_rules(rows, enabled_map)
+
+ defer.returnValue(rules)
+
+ @cachedInlineCallbacks(lru=True)
def get_push_rules_enabled_for_user(self, user_id):
results = yield self._simple_select_list(
table="push_rules_enable",
@@ -60,12 +88,16 @@ class PushRuleStore(SQLBaseStore):
r['rule_id']: False if r['enabled'] == 0 else True for r in results
})
- @defer.inlineCallbacks
+ @cachedList(cached_method_name="get_push_rules_for_user",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules(self, user_ids):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: []
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules",
@@ -75,18 +107,32 @@ class PushRuleStore(SQLBaseStore):
desc="bulk_get_push_rules",
)
- rows.sort(key=lambda e: (-e["priority_class"], -e["priority"]))
+ rows.sort(
+ key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
+ )
for row in rows:
results.setdefault(row['user_name'], []).append(row)
+
+ enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
+
+ for user_id, rules in results.items():
+ results[user_id] = _load_rules(
+ rules, enabled_map_by_user.get(user_id, {})
+ )
+
defer.returnValue(results)
- @defer.inlineCallbacks
+ @cachedList(cached_method_name="get_push_rules_enabled_for_user",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules_enabled(self, user_ids):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: {}
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules_enable",
@@ -96,7 +142,8 @@ class PushRuleStore(SQLBaseStore):
desc="bulk_get_push_rules_enabled",
)
for row in rows:
- results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
+ enabled = bool(row['enabled'])
+ results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
defer.returnValue(results)
@defer.inlineCallbacks
@@ -374,6 +421,9 @@ class PushRuleStore(SQLBaseStore):
def get_all_push_rule_updates(self, last_id, current_id, limit):
"""Get all the push rules changes that have happend on the server"""
+ if last_id == current_id:
+ return defer.succeed([])
+
def get_all_push_rule_updates_txn(txn):
sql = (
"SELECT stream_id, event_stream_ordering, user_id, rule_id,"
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9e8e2e2964..a7d7c54d7e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
import logging
import simplejson as json
@@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
- @cachedInlineCallbacks(num_args=1)
- def get_users_with_pushers_in_room(self, room_id):
- users = yield self.get_users_in_room(room_id)
-
+ @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
+ def get_if_user_has_pusher(self, user_id):
result = yield self._simple_select_many_batch(
table='pushers',
+ keyvalues={
+ 'user_name': 'user_id',
+ },
+ retcol='user_name',
+ desc='get_if_user_has_pusher',
+ allow_none=True,
+ )
+
+ defer.returnValue(bool(result))
+
+ @cachedList(cached_method_name="get_if_user_has_pusher",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
+ def get_if_users_have_pushers(self, user_ids):
+ rows = yield self._simple_select_many_batch(
+ table='pushers',
column='user_name',
- iterable=users,
+ iterable=user_ids,
retcols=['user_name'],
- desc='get_users_with_pushers_in_room'
+ desc='get_if_users_have_pushers'
)
- defer.returnValue([r['user_name'] for r in result])
+ result = {user_id: False for user_id in user_ids}
+ result.update({r['user_name']: True for r in rows})
+
+ defer.returnValue(result)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore):
},
)
if newly_inserted:
- # get_users_with_pushers_in_room only cares if the user has
+ # get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
yield self.runInteraction("add_pusher", f)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
self._simple_delete_one_txn(
txn,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..8c26f39fbb 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore):
"ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
)
+ @cachedInlineCallbacks()
+ def get_users_with_read_receipts_in_room(self, room_id):
+ receipts = yield self.get_receipts_for_room(room_id, "m.read")
+ defer.returnValue(set(r['user_id'] for r in receipts))
+
+ def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type,
+ user_id):
+ if receipt_type != "m.read":
+ return
+
+ # Returns an ObservableDeferred
+ res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
+
+ if res and res.called and user_id in res.result:
+ # We'd only be adding to the set, so no point invalidating if the
+ # user is already there
+ return
+
+ self.get_users_with_read_receipts_in_room.invalidate((room_id,))
+
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
return self._simple_select_list(
@@ -229,6 +249,10 @@ class ReceiptsStore(SQLBaseStore):
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
txn.call_after(
+ self._invalidate_get_users_with_receipts_in_room,
+ room_id, receipt_type, user_id,
+ )
+ txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
@@ -297,7 +321,7 @@ class ReceiptsStore(SQLBaseStore):
)
if receipt_type == "m.read" and topological_ordering:
- self._remove_push_actions_before_txn(
+ self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
user_id=user_id,
@@ -374,6 +398,10 @@ class ReceiptsStore(SQLBaseStore):
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
txn.call_after(
+ self._invalidate_get_users_with_receipts_in_room,
+ room_id, receipt_type, user_id,
+ )
+ txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 26933e593a..97f9f1929c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
- def f(txn):
+ def get_room_name(txn):
sql = (
- "SELECT event_id FROM current_state_events "
- "WHERE room_id = ? "
+ "SELECT name FROM room_names"
+ " INNER JOIN current_state_events USING (room_id, event_id)"
+ " WHERE room_id = ?"
+ " LIMIT 1"
)
- sql += " AND ((type = 'm.room.name' AND state_key = '')"
- sql += " OR type = 'm.room.aliases')"
-
txn.execute(sql, (room_id,))
- results = self.cursor_to_dict(txn)
+ rows = txn.fetchall()
+ if rows:
+ return rows[0][0]
+ else:
+ return None
- return self._parse_events_txn(txn, results)
+ return [row[0] for row in txn.fetchall()]
- events = yield self.runInteraction("get_room_name_and_aliases", f)
+ def get_room_aliases(txn):
+ sql = (
+ "SELECT content FROM current_state_events"
+ " INNER JOIN events USING (room_id, event_id)"
+ " WHERE room_id = ?"
+ )
+ txn.execute(sql, (room_id,))
+ return [row[0] for row in txn.fetchall()]
+
+ name = yield self.runInteraction("get_room_name", get_room_name)
+ alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
- name = None
aliases = []
- for e in events:
- if e.type == 'm.room.name':
- if 'name' in e.content:
- name = e.content['name']
- elif e.type == 'm.room.aliases':
- if 'aliases' in e.content:
- aliases.extend(e.content['aliases'])
+ for c in alias_contents:
+ try:
+ content = json.loads(c)
+ except:
+ continue
+
+ aliases.extend(content.get('aliases', []))
defer.returnValue((name, aliases))
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index face685ed2..8bd693be72 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
- txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
)
@@ -241,30 +238,10 @@ class RoomMemberStore(SQLBaseStore):
return results
- @cached(max_entries=5000)
+ @cachedInlineCallbacks(max_entries=5000)
def get_joined_hosts_for_room(self, room_id):
- return self.runInteraction(
- "get_joined_hosts_for_room",
- self._get_joined_hosts_for_room_txn,
- room_id,
- )
-
- def _get_joined_hosts_for_room_txn(self, txn, room_id):
- rows = self._get_members_rows_txn(
- txn,
- room_id, membership=Membership.JOIN
- )
-
- joined_domains = set(get_domain_from_id(r["user_id"]) for r in rows)
-
- return joined_domains
-
- def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
- rows = self._get_members_rows_txn(
- txn,
- room_id, membership, user_id,
- )
- return [r["event_id"] for r in rows]
+ user_ids = yield self.get_users_in_room(room_id)
+ defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids))
def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
where_clause = "c.room_id = ?"
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index b417e3ac08..5b7d8d1ab5 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from synapse.storage.appservice import ApplicationServiceStore
+from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
logger.warning("Could not get app_service_config_files from config")
pass
- appservices = ApplicationServiceStore.load_appservices(
+ appservices = load_appservices(
config.server_name, config_files
)
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 0224299625..12941d1775 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
import re
+import ujson as json
logger = logging.getLogger(__name__)
@@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
- "SELECT stream_ordering, event_id FROM events"
+ "SELECT stream_ordering, event_id, room_id, type, content FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore):
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
- rows = txn.fetchall()
+ rows = self.cursor_to_dict(txn)
if not rows:
return 0
- min_stream_id = rows[-1][0]
- event_ids = [row[1] for row in rows]
-
- events = self._get_events_txn(txn, event_ids)
+ min_stream_id = rows[-1]["stream_ordering"]
event_search_rows = []
- for event in events:
+ for row in rows:
try:
- event_id = event.event_id
- room_id = event.room_id
- content = event.content
- if event.type == "m.room.message":
+ event_id = row["event_id"]
+ room_id = row["room_id"]
+ etype = row["type"]
+ try:
+ content = json.loads(row["content"])
+ except:
+ continue
+
+ if etype == "m.room.message":
key = "content.body"
value = content["body"]
- elif event.type == "m.room.topic":
+ elif etype == "m.room.topic":
key = "content.topic"
value = content["topic"]
- elif event.type == "m.room.name":
+ elif etype == "m.room.name":
key = "content.name"
value = content["name"]
except (KeyError, AttributeError):
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index b10f2a5787..ea6823f18d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -19,17 +19,24 @@ from ._base import SQLBaseStore
from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
+from synapse.util.caches.descriptors import cached, cachedList
class SignatureStore(SQLBaseStore):
"""Persistence for event signatures and hashes"""
+ @cached(lru=True)
+ def get_event_reference_hash(self, event_id):
+ return self._get_event_reference_hashes_txn(event_id)
+
+ @cachedList(cached_method_name="get_event_reference_hash",
+ list_name="event_ids", num_args=1)
def get_event_reference_hashes(self, event_ids):
def f(txn):
- return [
- self._get_event_reference_hashes_txn(txn, ev)
- for ev in event_ids
- ]
+ return {
+ event_id: self._get_event_reference_hashes_txn(txn, event_id)
+ for event_id in event_ids
+ }
return self.runInteraction(
"get_event_reference_hashes",
@@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore):
hashes = yield self.get_event_reference_hashes(
event_ids
)
- hashes = [
- {
+ hashes = {
+ e_id: {
k: encode_base64(v) for k, v in h.items()
if k == "sha256"
}
- for h in hashes
- ]
+ for e_id, h in hashes.items()
+ }
- defer.returnValue(zip(event_ids, hashes))
+ defer.returnValue(hashes.items())
def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU.
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index ab991e877d..ada20706dc 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore):
return True
return False
- ret = self._get_events_txn(
- txn,
- # apply the filter on the room id list
- [
- r["event_id"] for r in rows
- if app_service_interested(r)
- ],
- get_prev_content=True
- )
+ return [r for r in rows if app_service_interested(r)]
- self._set_before_and_after(ret, rows)
+ rows = yield self.runInteraction("get_appservice_room_stream", f)
- if rows:
- key = "s%d" % max(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = to_key
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
- return ret, key
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
- results = yield self.runInteraction("get_appservice_room_stream", f)
- defer.returnValue(results)
+ if rows:
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ defer.returnValue((ret, key))
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 9da23f34cb..5a2c1aa59b 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -68,6 +68,9 @@ class TagsStore(SQLBaseStore):
A deferred list of tuples of stream_id int, user_id string,
room_id string, tag string and content string.
"""
+ if last_id == current_id:
+ defer.returnValue([])
+
def get_all_updated_tags_txn(txn):
sql = (
"SELECT stream_id, user_id, room_id"
diff --git a/synapse/types.py b/synapse/types.py
index cf950a0c36..bde9e8e2c5 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -25,7 +25,10 @@ Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
def get_domain_from_id(string):
- return string.split(":", 1)[1]
+ try:
+ return string.split(":", 1)[1]
+ except IndexError:
+ raise SynapseError(400, "Invalid ID: %r", string)
class DomainSpecificString(
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 0d6f48e2d8..40be7fe7e3 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -102,6 +102,15 @@ class ObservableDeferred(object):
def observers(self):
return self._observers
+ def has_called(self):
+ return self._result is not None
+
+ def has_succeeded(self):
+ return self._result is not None and self._result[0] is True
+
+ def get_result(self):
+ return self._result[1]
+
def __getattr__(self, name):
return getattr(self._deferred, name)
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index d53569ca49..ebd715c5dc 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -24,11 +24,21 @@ DEBUG_CACHES = False
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}
-cache_counter = metrics.register_cache(
- "cache",
- lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
- labels=["name"],
-)
+# cache_counter = metrics.register_cache(
+# "cache",
+# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
+# labels=["name"],
+# )
+
+
+def register_cache(name, cache):
+ caches_by_name[name] = cache
+ return metrics.register_cache(
+ "cache",
+ lambda: len(cache),
+ name,
+ )
+
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
caches_by_name["string_cache"] = _string_cache
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 758f5982b0..f31dfb22b7 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -22,7 +22,7 @@ from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
)
-from . import caches_by_name, DEBUG_CACHES, cache_counter
+from . import DEBUG_CACHES, register_cache
from twisted.internet import defer
@@ -33,6 +33,7 @@ import functools
import inspect
import threading
+
logger = logging.getLogger(__name__)
@@ -43,6 +44,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
class Cache(object):
+ __slots__ = (
+ "cache",
+ "max_entries",
+ "name",
+ "keylen",
+ "sequence",
+ "thread",
+ "metrics",
+ )
def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
if lru:
@@ -59,7 +69,7 @@ class Cache(object):
self.keylen = keylen
self.sequence = 0
self.thread = None
- caches_by_name[name] = self.cache
+ self.metrics = register_cache(name, self.cache)
def check_thread(self):
expected_thread = self.thread
@@ -74,10 +84,10 @@ class Cache(object):
def get(self, key, default=_CacheSentinel):
val = self.cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
return val
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
if default is _CacheSentinel:
raise KeyError()
@@ -293,16 +303,21 @@ class CacheListDescriptor(object):
# cached is a dict arg -> deferred, where deferred results in a
# 2-tuple (`arg`, `result`)
- cached = {}
+ results = {}
+ cached_defers = {}
missing = []
for arg in list_args:
key = list(keyargs)
key[self.list_pos] = arg
try:
- res = cache.get(tuple(key)).observe()
- res.addCallback(lambda r, arg: (arg, r), arg)
- cached[arg] = res
+ res = cache.get(tuple(key))
+ if not res.has_succeeded():
+ res = res.observe()
+ res.addCallback(lambda r, arg: (arg, r), arg)
+ cached_defers[arg] = res
+ else:
+ results[arg] = res.get_result()
except KeyError:
missing.append(arg)
@@ -340,12 +355,21 @@ class CacheListDescriptor(object):
res = observer.observe()
res.addCallback(lambda r, arg: (arg, r), arg)
- cached[arg] = res
-
- return preserve_context_over_deferred(defer.gatherResults(
- cached.values(),
- consumeErrors=True,
- ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res)))
+ cached_defers[arg] = res
+
+ if cached_defers:
+ def update_results_dict(res):
+ results.update(res)
+ return results
+
+ return preserve_context_over_deferred(defer.gatherResults(
+ cached_defers.values(),
+ consumeErrors=True,
+ ).addCallback(update_results_dict).addErrback(
+ unwrapFirstError
+ ))
+ else:
+ return results
obj.__dict__[self.orig.__name__] = wrapped
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index f92d80542b..b0ca1bb79d 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -15,7 +15,7 @@
from synapse.util.caches.lrucache import LruCache
from collections import namedtuple
-from . import caches_by_name, cache_counter
+from . import register_cache
import threading
import logging
@@ -43,7 +43,7 @@ class DictionaryCache(object):
__slots__ = []
self.sentinel = Sentinel()
- caches_by_name[name] = self.cache
+ self.metrics = register_cache(name, self.cache)
def check_thread(self):
expected_thread = self.thread
@@ -58,7 +58,7 @@ class DictionaryCache(object):
def get(self, key, dict_keys=None):
entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel:
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
if dict_keys is None:
return DictionaryEntry(entry.full, dict(entry.value))
@@ -69,7 +69,7 @@ class DictionaryCache(object):
if k in entry.value
})
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return DictionaryEntry(False, {})
def invalidate(self, key):
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 2b68c1ac93..080388958f 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches import register_cache
import logging
@@ -49,7 +49,7 @@ class ExpiringCache(object):
self._cache = {}
- caches_by_name[cache_name] = self._cache
+ self.metrics = register_cache(cache_name, self._cache)
def start(self):
if not self._expiry_ms:
@@ -78,9 +78,9 @@ class ExpiringCache(object):
def __getitem__(self, key):
try:
entry = self._cache[key]
- cache_counter.inc_hits(self._cache_name)
+ self.metrics.inc_hits()
except KeyError:
- cache_counter.inc_misses(self._cache_name)
+ self.metrics.inc_misses()
raise
if self._reset_expiry_on_get:
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index ea8a74ca69..3c051dabc4 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches import register_cache
from blist import sorteddict
@@ -42,7 +42,7 @@ class StreamChangeCache(object):
self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
- caches_by_name[self.name] = self._cache
+ self.metrics = register_cache(self.name, self._cache)
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
@@ -53,19 +53,19 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos < self._earliest_known_stream_pos:
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return True
latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None:
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
return False
if stream_pos < latest_entity_change_pos:
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return True
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
return False
def get_entities_changed(self, entities, stream_pos):
@@ -82,10 +82,10 @@ class StreamChangeCache(object):
self._cache[k] for k in keys[i:]
).intersection(entities)
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
else:
result = entities
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return result
diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py
index 3efa8a8206..a6866f6117 100644
--- a/synapse/util/presentable_names.py
+++ b/synapse/util/presentable_names.py
@@ -14,6 +14,9 @@
# limitations under the License.
import re
+import logging
+
+logger = logging.getLogger(__name__)
# intentionally looser than what aliases we allow to be registered since
# other HSes may allow aliases that we would not
@@ -105,13 +108,21 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True):
# or inbound invite, or outbound 3PID invite.
if all_members[0].sender == user_id:
if "m.room.third_party_invite" in room_state_bytype:
- third_party_invites = room_state_bytype["m.room.third_party_invite"]
+ third_party_invites = (
+ room_state_bytype["m.room.third_party_invite"].values()
+ )
+
if len(third_party_invites) > 0:
# technically third party invite events are not member
# events, but they are close enough
- return "Inviting %s" (
- descriptor_from_member_events(third_party_invites)
- )
+
+ # FIXME: no they're not - they look nothing like a member;
+ # they have a great big encrypted thing as their name to
+ # prevent leaking the 3PID name...
+ # return "Inviting %s" % (
+ # descriptor_from_member_events(third_party_invites)
+ # )
+ return "Inviting email address"
else:
return ALL_ALONE
else:
diff --git a/tests/config/test_generate.py b/tests/config/test_generate.py
index 4329d73974..8f57fbeb23 100644
--- a/tests/config/test_generate.py
+++ b/tests/config/test_generate.py
@@ -30,7 +30,7 @@ class ConfigGenerationTestCase(unittest.TestCase):
shutil.rmtree(self.dir)
def test_generate_config_generates_files(self):
- HomeServerConfig.load_config("", [
+ HomeServerConfig.load_or_generate_config("", [
"--generate-config",
"-c", self.file,
"--report-stats=yes",
diff --git a/tests/config/test_load.py b/tests/config/test_load.py
index bf46233c5c..161a87d7e3 100644
--- a/tests/config/test_load.py
+++ b/tests/config/test_load.py
@@ -34,6 +34,8 @@ class ConfigLoadingTestCase(unittest.TestCase):
self.generate_config_and_remove_lines_containing("server_name")
with self.assertRaises(Exception):
HomeServerConfig.load_config("", ["-c", self.file])
+ with self.assertRaises(Exception):
+ HomeServerConfig.load_or_generate_config("", ["-c", self.file])
def test_generates_and_loads_macaroon_secret_key(self):
self.generate_config()
@@ -54,11 +56,24 @@ class ConfigLoadingTestCase(unittest.TestCase):
"was: %r" % (config.macaroon_secret_key,)
)
+ config = HomeServerConfig.load_or_generate_config("", ["-c", self.file])
+ self.assertTrue(
+ hasattr(config, "macaroon_secret_key"),
+ "Want config to have attr macaroon_secret_key"
+ )
+ if len(config.macaroon_secret_key) < 5:
+ self.fail(
+ "Want macaroon secret key to be string of at least length 5,"
+ "was: %r" % (config.macaroon_secret_key,)
+ )
+
def test_load_succeeds_if_macaroon_secret_key_missing(self):
self.generate_config_and_remove_lines_containing("macaroon")
config1 = HomeServerConfig.load_config("", ["-c", self.file])
config2 = HomeServerConfig.load_config("", ["-c", self.file])
+ config3 = HomeServerConfig.load_or_generate_config("", ["-c", self.file])
self.assertEqual(config1.macaroon_secret_key, config2.macaroon_secret_key)
+ self.assertEqual(config1.macaroon_secret_key, config3.macaroon_secret_key)
def test_disable_registration(self):
self.generate_config()
@@ -70,14 +85,17 @@ class ConfigLoadingTestCase(unittest.TestCase):
config = HomeServerConfig.load_config("", ["-c", self.file])
self.assertFalse(config.enable_registration)
+ config = HomeServerConfig.load_or_generate_config("", ["-c", self.file])
+ self.assertFalse(config.enable_registration)
+
# Check that either config value is clobbered by the command line.
- config = HomeServerConfig.load_config("", [
+ config = HomeServerConfig.load_or_generate_config("", [
"-c", self.file, "--enable-registration"
])
self.assertTrue(config.enable_registration)
def generate_config(self):
- HomeServerConfig.load_config("", [
+ HomeServerConfig.load_or_generate_config("", [
"--generate-config",
"-c", self.file,
"--report-stats=yes",
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 7ddbbb9b4a..a884c95f8d 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -30,9 +30,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastore = Mock(return_value=self.mock_store)
- self.handler = ApplicationServicesHandler(
- hs, self.mock_as_api, self.mock_scheduler
- )
+ hs.get_application_service_api = Mock(return_value=self.mock_as_api)
+ hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
+ self.handler = ApplicationServicesHandler(hs)
@defer.inlineCallbacks
def test_notify_interested_services(self):
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 87c795fcfa..b531ba8540 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={
- user_id: 1,
- }, now=now
+ state, is_mine=True, syncing_user_ids=set([user_id]), now=now
)
self.assertIsNotNone(new_state)
@@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNone(new_state)
@@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=False, user_to_num_current_syncs={}, now=now
+ state, is_mine=False, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index 8b7be96bd9..69a5e5b1d4 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from .. import unittest
from synapse.handlers.register import RegistrationHandler
+from synapse.types import UserID
from tests.utils import setup_test_homeserver
@@ -36,31 +37,43 @@ class RegistrationTestCase(unittest.TestCase):
self.mock_distributor = Mock()
self.mock_distributor.declare("registered_user")
self.mock_captcha_client = Mock()
- hs = yield setup_test_homeserver(
+ self.hs = yield setup_test_homeserver(
handlers=None,
http_client=None,
expire_access_token=True)
- hs.handlers = RegistrationHandlers(hs)
- self.handler = hs.get_handlers().registration_handler
- hs.get_handlers().profile_handler = Mock()
+ self.auth_handler = Mock(
+ generate_short_term_login_token=Mock(return_value='secret'))
+ self.hs.handlers = RegistrationHandlers(self.hs)
+ self.handler = self.hs.get_handlers().registration_handler
+ self.hs.get_handlers().profile_handler = Mock()
self.mock_handler = Mock(spec=[
"generate_short_term_login_token",
])
-
- hs.get_handlers().auth_handler = self.mock_handler
+ self.hs.get_auth_handler = Mock(return_value=self.auth_handler)
@defer.inlineCallbacks
def test_user_is_created_and_logged_in_if_doesnt_exist(self):
- """
- Returns:
- The user doess not exist in this case so it will register and log it in
- """
duration_ms = 200
local_part = "someone"
display_name = "someone"
user_id = "@someone:test"
- mock_token = self.mock_handler.generate_short_term_login_token
- mock_token.return_value = 'secret'
+ result_user_id, result_token = yield self.handler.get_or_create_user(
+ local_part, display_name, duration_ms)
+ self.assertEquals(result_user_id, user_id)
+ self.assertEquals(result_token, 'secret')
+
+ @defer.inlineCallbacks
+ def test_if_user_exists(self):
+ store = self.hs.get_datastore()
+ frank = UserID.from_string("@frank:test")
+ yield store.register(
+ user_id=frank.to_string(),
+ token="jkv;g498752-43gj['eamb!-5",
+ password_hash=None)
+ duration_ms = 200
+ local_part = "frank"
+ display_name = "Frank"
+ user_id = "@frank:test"
result_user_id, result_token = yield self.handler.get_or_create_user(
local_part, display_name, duration_ms)
self.assertEquals(result_user_id, user_id)
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index abb739ae52..ab9899b7d5 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -251,12 +251,12 @@ class TypingNotificationsTestCase(unittest.TestCase):
# Gut-wrenching
from synapse.handlers.typing import RoomMember
- member = RoomMember(self.room_id, self.u_apple)
+ member = RoomMember(self.room_id, self.u_apple.to_string())
self.handler._member_typing_until[member] = 1002000
self.handler._member_typing_timer[member] = (
self.clock.call_later(1002, lambda: 0)
)
- self.handler._room_typing[self.room_id] = set((self.u_apple,))
+ self.handler._room_typing[self.room_id] = set((self.u_apple.to_string(),))
self.assertEquals(self.event_source.get_current_key(), 0)
diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py
index f3c1927ce1..f85455a5af 100644
--- a/tests/metrics/test_metric.py
+++ b/tests/metrics/test_metric.py
@@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase):
'vector{method="PUT"} 1',
])
- # Check that passing too few values errors
- self.assertRaises(ValueError, counter.inc)
-
class CallbackMetricTestCase(unittest.TestCase):
@@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase):
def test_cache(self):
d = dict()
- metric = CacheMetric("cache", lambda: len(d))
+ metric = CacheMetric("cache", lambda: len(d), "cache_name")
self.assertEquals(metric.render(), [
- 'cache:hits 0',
- 'cache:total 0',
- 'cache:size 0',
+ 'cache:hits{name="cache_name"} 0',
+ 'cache:total{name="cache_name"} 0',
+ 'cache:size{name="cache_name"} 0',
])
metric.inc_misses()
d["key"] = "value"
self.assertEquals(metric.render(), [
- 'cache:hits 0',
- 'cache:total 1',
- 'cache:size 1',
+ 'cache:hits{name="cache_name"} 0',
+ 'cache:total{name="cache_name"} 1',
+ 'cache:size{name="cache_name"} 1',
])
metric.inc_hits()
self.assertEquals(metric.render(), [
- 'cache:hits 1',
- 'cache:total 2',
- 'cache:size 1',
+ 'cache:hits{name="cache_name"} 1',
+ 'cache:total{name="cache_name"} 2',
+ 'cache:size{name="cache_name"} 1',
])
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index affd42c015..cda0a2b27c 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -33,7 +33,6 @@ class RegisterRestServletTestCase(unittest.TestCase):
# do the dance to hook it up to the hs global
self.handlers = Mock(
- auth_handler=self.auth_handler,
registration_handler=self.registration_handler,
identity_handler=self.identity_handler,
login_handler=self.login_handler
@@ -42,6 +41,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.hs.hostname = "superbig~testing~thing.com"
self.hs.get_auth = Mock(return_value=self.auth)
self.hs.get_handlers = Mock(return_value=self.handlers)
+ self.hs.get_auth_handler = Mock(return_value=self.auth_handler)
self.hs.config.enable_registration = True
# init the thing we're testing
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 5734198121..3e2862daae 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
# we aren't testing store._base stuff here, so mock this out
- self.store._get_events_txn = Mock(return_value=events)
+ self.store._get_events = Mock(return_value=events)
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
yield self._insert_txn(service.id, 10, events)
diff --git a/tests/utils.py b/tests/utils.py
index 59d985b5f2..e19ae581e0 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -67,6 +67,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
version_string="Synapse/tests",
database_engine=create_engine(config.database_config),
get_db_conn=db_pool.get_db_conn,
+ room_list_handler=object(),
**kargs
)
hs.setup()
@@ -75,20 +76,16 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
name, db_pool=None, datastore=datastore, config=config,
version_string="Synapse/tests",
database_engine=create_engine(config.database_config),
+ room_list_handler=object(),
**kargs
)
# bcrypt is far too slow to be doing in unit tests
- def swap_out_hash_for_testing(old_build_handlers):
- def build_handlers():
- handlers = old_build_handlers()
- auth_handler = handlers.auth_handler
- auth_handler.hash = lambda p: hashlib.md5(p).hexdigest()
- auth_handler.validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h
- return handlers
- return build_handlers
-
- hs.build_handlers = swap_out_hash_for_testing(hs.build_handlers)
+ # Need to let the HS build an auth handler and then mess with it
+ # because AuthHandler's constructor requires the HS, so we can't make one
+ # beforehand and pass it in to the HS's constructor (chicken / egg)
+ hs.get_auth_handler().hash = lambda p: hashlib.md5(p).hexdigest()
+ hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h
fed = kargs.get("resource_for_federation", None)
if fed:
diff --git a/tox.ini b/tox.ini
index 757b7189c3..52d93c65e5 100644
--- a/tox.ini
+++ b/tox.ini
@@ -11,7 +11,7 @@ deps =
setenv =
PYTHONDONTWRITEBYTECODE = no_byte_code
commands =
- /bin/bash -c "find {toxinidir} -name '*.pyc' -delete ; coverage run {env:COVERAGE_OPTS:} --source={toxinidir}/synapse \
+ /bin/sh -c "find {toxinidir} -name '*.pyc' -delete ; coverage run {env:COVERAGE_OPTS:} --source={toxinidir}/synapse \
{envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}"
{env:DUMP_COVERAGE_COMMAND:coverage report -m}
@@ -26,4 +26,4 @@ skip_install = True
basepython = python2.7
deps =
flake8
-commands = /bin/bash -c "flake8 synapse tests {env:PEP8SUFFIX:}"
+commands = /bin/sh -c "flake8 synapse tests {env:PEP8SUFFIX:}"
|