Date: Tue, 31 May 2016 20:28:42 +0100
Subject: handle emotes & notices correctly in email notifs
---
res/templates/notif.html | 6 +++++-
res/templates/notif.txt | 6 +++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/res/templates/notif.html b/res/templates/notif.html
index 834840861e..88b921ca9c 100644
--- a/res/templates/notif.html
+++ b/res/templates/notif.html
@@ -17,11 +17,15 @@
{% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
- {{ message.sender_name }}
+ {% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}
{% endif %}
{% if message.msgtype == "m.text" %}
{{ message.body_text_html }}
+ {% elif message.msgtype == "m.emote" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.notice" %}
+ {{ message.body_text_html }}
{% elif message.msgtype == "m.image" %}
 }})
{% elif message.msgtype == "m.file" %}
diff --git a/res/templates/notif.txt b/res/templates/notif.txt
index a3ddac80ce..a37bee9833 100644
--- a/res/templates/notif.txt
+++ b/res/templates/notif.txt
@@ -1,7 +1,11 @@
{% for message in notif.messages %}
-{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
+{% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
{% if message.msgtype == "m.text" %}
{{ message.body_text_plain }}
+{% elif message.msgtype == "m.emote" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.notice" %}
+{{ message.body_text_plain }}
{% elif message.msgtype == "m.image" %}
{{ message.body_text_plain }}
{% elif message.msgtype == "m.file" %}
--
cgit 1.5.1
From 6ecb2ca4ec3fae8c6f2e837b4ec99cc6929de638 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Wed, 1 Jun 2016 09:48:55 +0100
Subject: pep8
---
synapse/federation/transport/server.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index da9e7a326d..a1a334955f 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -134,7 +134,8 @@ class Authenticator(object):
class BaseFederationServlet(object):
- def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler):
+ def __init__(self, handler, authenticator, ratelimiter, server_name,
+ room_list_handler):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
@@ -492,6 +493,7 @@ class OpenIdUserInfo(BaseFederationServlet):
def _wrap(self, code):
return code
+
class PublicRoomList(BaseFederationServlet):
"""
Fetch the public room list for this server.
--
cgit 1.5.1
From 43db0d9f6a314679bd25b82354e5c469e7a010b9 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 1 Jun 2016 10:31:09 +0100
Subject: Add get_users_with_read_receipts_in_room cache
---
synapse/push/bulk_push_rule_evaluator.py | 8 ++++----
synapse/storage/receipts.py | 28 ++++++++++++++++++++++++++++
2 files changed, 32 insertions(+), 4 deletions(-)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 25f2fb9da4..1e5c4b073c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -87,13 +87,13 @@ def evaluator_for_event(event, hs, store):
all_in_room = yield store.get_users_in_room(room_id)
all_in_room = set(all_in_room)
- receipts = yield store.get_receipts_for_room(room_id, "m.read")
+ users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
# any users with pushers must be ours: they have pushers
user_ids = set(users_with_pushers)
- for r in receipts:
- if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room:
- user_ids.add(r['user_id'])
+ for uid in users_with_receipts:
+ if hs.is_mine_id(uid) and uid in all_in_room:
+ user_ids.add(uid)
# if this event is an invite event, we may need to run rules for the user
# who's been invited, otherwise they won't get told they've been invited
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..964f30dff7 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore):
"ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
)
+ @cachedInlineCallbacks()
+ def get_users_with_read_receipts_in_room(self, room_id):
+ receipts = yield self.get_receipts_for_room(room_id, "m.read")
+ defer.returnValue(set(r['user_id'] for r in receipts))
+
+ def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type,
+ user_id):
+ if receipt_type != "m.read":
+ return
+
+ # Returns an ObservableDeferred
+ res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
+
+ if res and res.called and user_id in res.result:
+ # We'd only be adding to the set, so no point invalidating if the
+ # user is already there
+ return
+
+ self.get_users_with_read_receipts_in_room.invalidate((room_id,))
+
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
return self._simple_select_list(
@@ -228,6 +248,10 @@ class ReceiptsStore(SQLBaseStore):
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
+ txn.call_after(
+ self._invalidate_get_users_with_receipts_in_room,
+ room_id, receipt_type, user_id,
+ )
txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
@@ -373,6 +397,10 @@ class ReceiptsStore(SQLBaseStore):
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
+ txn.call_after(
+ self._invalidate_get_users_with_receipts_in_room,
+ room_id, receipt_type, user_id,
+ )
txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
--
cgit 1.5.1
From 195254cae80f4748c3fc0ac3b46000047c2e6cc0 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Wed, 1 Jun 2016 11:14:16 +0100
Subject: Inject fake room list handler in tests
Otherwise it tries to start the remote public room list updating looping call which breaks.
---
tests/utils.py | 2 ++
1 file changed, 2 insertions(+)
diff --git a/tests/utils.py b/tests/utils.py
index 59d985b5f2..006abedbc1 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -67,6 +67,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
version_string="Synapse/tests",
database_engine=create_engine(config.database_config),
get_db_conn=db_pool.get_db_conn,
+ room_list_handler=object(),
**kargs
)
hs.setup()
@@ -75,6 +76,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
name, db_pool=None, datastore=datastore, config=config,
version_string="Synapse/tests",
database_engine=create_engine(config.database_config),
+ room_list_handler=object(),
**kargs
)
--
cgit 1.5.1
From d60eed07109f61ebe2120e1eb566e5bb7095fbad Mon Sep 17 00:00:00 2001
From: David Baker
Date: Wed, 1 Jun 2016 11:45:43 +0100
Subject: Limit number of notifications in an email notification
---
synapse/storage/event_push_actions.py | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4dae51a172..940e11d7a2 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -119,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id,
min_stream_ordering,
- max_stream_ordering=None):
+ max_stream_ordering=None,
+ limit=20):
def get_after_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
@@ -151,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering)
- sql += " ORDER BY ep.stream_ordering ASC"
+ sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+ args.append(limit)
txn.execute(sql, args)
return txn.fetchall()
after_read_receipt = yield self.runInteraction(
--
cgit 1.5.1
From c8285564a3772db387fd5c94b1a82329dc320e36 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 1 Jun 2016 11:08:45 +0100
Subject: Use state to calculate get_users_in_room
---
synapse/push/action_generator.py | 2 +-
synapse/push/bulk_push_rule_evaluator.py | 27 ++++++++++++++---------
synapse/storage/events.py | 3 ---
synapse/storage/pusher.py | 38 +++++++++++++++++++++++---------
synapse/storage/roommember.py | 3 ---
5 files changed, 45 insertions(+), 28 deletions(-)
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 9b208668b6..46e768e35c 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,7 +40,7 @@ class ActionGenerator:
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "handle_push_actions_for_event"):
bulk_evaluator = yield evaluator_for_event(
- event, self.hs, self.store
+ event, self.hs, self.store, context.current_state
)
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 1e5c4b073c..8c59e59e03 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -21,7 +21,7 @@ from twisted.internet import defer
from .baserules import list_with_base_rules
from .push_rule_evaluator import PushRuleEvaluatorForEvent
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.visibility import filter_events_for_clients
@@ -72,20 +72,24 @@ def _get_rules(room_id, user_ids, store):
@defer.inlineCallbacks
-def evaluator_for_event(event, hs, store):
+def evaluator_for_event(event, hs, store, current_state):
room_id = event.room_id
-
- # users in the room who have pushers need to get push rules run because
- # that's how their pushers work
- users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
-
# We also will want to generate notifs for other people in the room so
# their unread countss are correct in the event stream, but to avoid
# generating them for bot / AS users etc, we only do so for people who've
# sent a read receipt into the room.
- all_in_room = yield store.get_users_in_room(room_id)
- all_in_room = set(all_in_room)
+ all_in_room = set(
+ e.state_key for e in current_state.values()
+ if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ )
+
+ # users in the room who have pushers need to get push rules run because
+ # that's how their pushers work
+ if_users_with_pushers = yield store.get_if_users_have_pushers(all_in_room)
+ users_with_pushers = set(
+ uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
+ )
users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
@@ -143,7 +147,10 @@ class BulkPushRuleEvaluator:
self.store, user_tuples, [event], {event.event_id: current_state}
)
- room_members = yield self.store.get_users_in_room(self.room_id)
+ room_members = set(
+ e.state_key for e in current_state.values()
+ if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ )
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4655669ba0..2b3f79577b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -342,9 +342,6 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
- txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9e8e2e2964..39d5349eaa 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
import logging
import simplejson as json
@@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
- @cachedInlineCallbacks(num_args=1)
- def get_users_with_pushers_in_room(self, room_id):
- users = yield self.get_users_in_room(room_id)
-
+ @cachedInlineCallbacks(lru=True, num_args=1)
+ def get_if_user_has_pusher(self, user_id):
result = yield self._simple_select_many_batch(
+ table='pushers',
+ keyvalues={
+ 'user_name': 'user_id',
+ },
+ retcol='user_name',
+ desc='get_if_user_has_pusher',
+ allow_none=True,
+ )
+
+ defer.returnValue(bool(result))
+
+ @cachedList(cached_method_name="get_if_user_has_pusher",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
+ def get_if_users_have_pushers(self, user_ids):
+ rows = yield self._simple_select_many_batch(
table='pushers',
column='user_name',
- iterable=users,
+ iterable=user_ids,
retcols=['user_name'],
- desc='get_users_with_pushers_in_room'
+ desc='get_if_users_have_pushers'
)
- defer.returnValue([r['user_name'] for r in result])
+ result = {user_id: False for user_id in user_ids}
+ result.update({r['user_name']: True for r in rows})
+
+ defer.returnValue(result)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore):
},
)
if newly_inserted:
- # get_users_with_pushers_in_room only cares if the user has
+ # get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
yield self.runInteraction("add_pusher", f)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
self._simple_delete_one_txn(
txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index face685ed2..41b395e07c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -58,9 +58,6 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
- txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
--
cgit 1.5.1
From 991af8b0d6406b633386384d823e5c3a9c2ceb8b Mon Sep 17 00:00:00 2001
From: David Baker
Date: Wed, 1 Jun 2016 17:40:52 +0100
Subject: WIP on unsubscribing email notifs without logging in
---
synapse/api/auth.py | 25 +++++++++++-------
synapse/rest/client/v1/pusher.py | 55 +++++++++++++++++++++++++++++++++++++++-
2 files changed, 70 insertions(+), 10 deletions(-)
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2474a1453b..2ece59bb19 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""This module contains classes for authenticating the user."""
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json, SignatureVerifyException
@@ -42,13 +41,20 @@ AuthEventTypes = (
class Auth(object):
-
+ """
+ FIXME: This class contains a mix of functions for authenticating users
+ of our client-server API and authenticating events added to room graphs.
+ """
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
+ # Docs for these currently lives at
+ # https://github.com/matrix-org/matrix-doc/blob/master/drafts/macaroons_caveats.rst
+ # In addition, we have type == delete_pusher which grants access only to
+ # delete pushers.
self._KNOWN_CAVEAT_PREFIXES = set([
"gen = ",
"guest = ",
@@ -507,7 +513,7 @@ class Auth(object):
return default
@defer.inlineCallbacks
- def get_user_by_req(self, request, allow_guest=False):
+ def get_user_by_req(self, request, allow_guest=False, rights="access"):
""" Get a registered user's ID.
Args:
@@ -529,7 +535,7 @@ class Auth(object):
)
access_token = request.args["access_token"][0]
- user_info = yield self.get_user_by_access_token(access_token)
+ user_info = yield self.get_user_by_access_token(access_token, rights)
user = user_info["user"]
token_id = user_info["token_id"]
is_guest = user_info["is_guest"]
@@ -590,7 +596,7 @@ class Auth(object):
defer.returnValue(user_id)
@defer.inlineCallbacks
- def get_user_by_access_token(self, token):
+ def get_user_by_access_token(self, token, rights="access"):
""" Get a registered user's ID.
Args:
@@ -601,7 +607,7 @@ class Auth(object):
AuthError if no user by that token exists or the token is invalid.
"""
try:
- ret = yield self.get_user_from_macaroon(token)
+ ret = yield self.get_user_from_macaroon(token, rights)
except AuthError:
# TODO(daniel): Remove this fallback when all existing access tokens
# have been re-issued as macaroons.
@@ -609,11 +615,11 @@ class Auth(object):
defer.returnValue(ret)
@defer.inlineCallbacks
- def get_user_from_macaroon(self, macaroon_str):
+ def get_user_from_macaroon(self, macaroon_str, rights="access"):
try:
macaroon = pymacaroons.Macaroon.deserialize(macaroon_str)
- self.validate_macaroon(macaroon, "access", self.hs.config.expire_access_token)
+ self.validate_macaroon(macaroon, rights, self.hs.config.expire_access_token)
user_prefix = "user_id = "
user = None
@@ -667,7 +673,8 @@ class Auth(object):
Args:
macaroon(pymacaroons.Macaroon): The macaroon to validate
- type_string(str): The kind of token this is (e.g. "access", "refresh")
+ type_string(str): The kind of token required (e.g. "access", "refresh",
+ "delete_pusher")
verify_expiry(bool): Whether to verify whether the macaroon has expired.
This should really always be True, but no clients currently implement
token refresh, so we can't enforce expiry yet.
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index ab928a16da..fa7a0992dd 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -17,7 +17,11 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, Codes
from synapse.push import PusherConfigException
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import (
+ parse_json_object_from_request, parse_string, RestServlet
+)
+from synapse.http.server import finish_request
+from synapse.api.errors import StoreError
from .base import ClientV1RestServlet, client_path_patterns
@@ -136,6 +140,55 @@ class PushersSetRestServlet(ClientV1RestServlet):
return 200, {}
+class PushersRemoveRestServlet(RestServlet):
+ """
+ To allow pusher to be delete by clicking a link (ie. GET request)
+ """
+ PATTERNS = client_path_patterns("/pushers/remove$")
+ SUCCESS_HTML = "You have been unsubscribed"
+
+ def __init__(self, hs):
+ super(RestServlet, self).__init__()
+ self.notifier = hs.get_notifier()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ requester = yield self.auth.get_user_by_req(request, "delete_pusher")
+ user = requester.user
+
+ app_id = parse_string(request, "app_id", required=True)
+ pushkey = parse_string(request, "pushkey", required=True)
+
+ pusher_pool = self.hs.get_pusherpool()
+
+ try:
+ yield pusher_pool.remove_pusher(
+ app_id=app_id,
+ pushkey=pushkey,
+ user_id=user.to_string(),
+ )
+ except StoreError as se:
+ if se.code != 404:
+ # This is fine: they're already unsubscribed
+ raise
+
+ self.notifier.on_new_replication_data()
+
+ request.setResponseCode(200)
+ request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+ request.setHeader(b"Server", self.hs.version_string)
+ request.setHeader(b"Content-Length", b"%d" % (
+ len(PushersRemoveRestServlet.SUCCESS_HTML),
+ ))
+ request.write(PushersRemoveRestServlet.SUCCESS_HTML)
+ finish_request(request)
+ defer.returnValue(None)
+
+ def on_OPTIONS(self, _):
+ return 200, {}
+
+
def register_servlets(hs, http_server):
PushersRestServlet(hs).register(http_server)
PushersSetRestServlet(hs).register(http_server)
+ PushersRemoveRestServlet(hs).register(http_server)
--
cgit 1.5.1
From e0deeff23eec099ad6bcb6e7170f524dc14982e4 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Wed, 1 Jun 2016 17:58:58 +0100
Subject: Fix room list spidering
---
synapse/handlers/room.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 77063b021a..9fd34588dd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -441,7 +441,7 @@ class RoomListHandler(BaseHandler):
self.hs.config.secondary_directory_servers
)
self.remote_list_request_cache.set((), deferred)
- yield deferred
+ self.remote_list_cache = yield deferred
@defer.inlineCallbacks
def get_aggregated_public_room_list(self):
--
cgit 1.5.1
From aaa70e26a2eb37fbdf728393148e003dc9866afd Mon Sep 17 00:00:00 2001
From: Matthew Hodgson
Date: Wed, 1 Jun 2016 22:13:47 +0100
Subject: special case m.room.third_party_invite event auth to match invites,
otherwise they get out of sync and you get
https://github.com/vector-im/vector-web/issues/1208
---
synapse/api/auth.py | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2474a1453b..007a0998a7 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -120,6 +120,24 @@ class Auth(object):
return allowed
self.check_event_sender_in_room(event, auth_events)
+
+ # Special case to allow m.room.third_party_invite events wherever
+ # a user is allowed to issue invites. Fixes
+ # https://github.com/vector-im/vector-web/issues/1208 hopefully
+ if event.type == EventTypes.ThirdPartyInvite:
+ user_level = self._get_user_power_level(event.user_id, auth_events)
+ invite_level = self._get_named_level(auth_events, "invite", 0)
+
+ if user_level < invite_level:
+ raise AuthError(
+ 403, (
+ "You cannot issue a third party invite for %s." %
+ (event.content.display_name,)
+ )
+ )
+ else:
+ return True
+
self._can_send_event(event, auth_events)
if event.type == EventTypes.PowerLevels:
--
cgit 1.5.1
From e793866398ffb3e222a86ebb15b9d24220accbc8 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Thu, 2 Jun 2016 09:41:13 +0100
Subject: Use user_id in email greeting if display name is null
---
synapse/push/mailer.py | 2 ++
1 file changed, 2 insertions(+)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 3ae92d1574..fe5d67a03c 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -122,6 +122,8 @@ class Mailer(object):
user_display_name = yield self.store.get_profile_displayname(
UserID.from_string(user_id).localpart
)
+ if user_display_name is None:
+ user_display_name = user_id
except StoreError:
user_display_name = user_id
--
cgit 1.5.1
From a15ad608496fd29fb8bf289152c23adca822beca Mon Sep 17 00:00:00 2001
From: David Baker
Date: Thu, 2 Jun 2016 11:44:15 +0100
Subject: Email unsubscribing that may in theory, work
Were it not for that fact that you can't use the base handler in the pusher because it pulls in the world. Comitting while I fix that on a different branch.
---
synapse/handlers/auth.py | 5 +++++
synapse/push/emailpusher.py | 2 +-
synapse/push/mailer.py | 21 ++++++++++++++++-----
3 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 26c865e171..200793b5ed 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -529,6 +529,11 @@ class AuthHandler(BaseHandler):
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
+ def generate_delete_pusher_token(self, user_id):
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = delete_pusher")
+ return macaroon.serialize()
+
def validate_short_term_login_token_and_get_user_id(self, login_token):
try:
macaroon = pymacaroons.Macaroon.deserialize(login_token)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index a72cba8306..46d7c0434b 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -273,5 +273,5 @@ class EmailPusher(object):
logger.info("Sending notif email for user %r", self.user_id)
yield self.mailer.send_notification_mail(
- self.user_id, self.email, push_actions, reason
+ self.app_id, self.user_id, self.email, push_actions, reason
)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 3ae92d1574..95250bad7d 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -81,6 +81,7 @@ class Mailer(object):
def __init__(self, hs):
self.hs = hs
self.store = self.hs.get_datastore()
+ self.handlers = self.hs.get_handlers()
self.state_handler = self.hs.get_state_handler()
loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
self.app_name = self.hs.config.email_app_name
@@ -95,7 +96,8 @@ class Mailer(object):
)
@defer.inlineCallbacks
- def send_notification_mail(self, user_id, email_address, push_actions, reason):
+ def send_notification_mail(self, app_id, user_id, email_address,
+ push_actions, reason):
raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1]
raw_to = email.utils.parseaddr(email_address)[1]
@@ -157,7 +159,7 @@ class Mailer(object):
template_vars = {
"user_display_name": user_display_name,
- "unsubscribe_link": self.make_unsubscribe_link(),
+ "unsubscribe_link": self.make_unsubscribe_link(app_id, email_address),
"summary_text": summary_text,
"app_name": self.app_name,
"rooms": rooms,
@@ -423,9 +425,18 @@ class Mailer(object):
notif['room_id'], notif['event_id']
)
- def make_unsubscribe_link(self):
- # XXX: matrix.to
- return "https://vector.im/#/settings"
+ def make_unsubscribe_link(self, app_id, email_address):
+ params = {
+ "access_token": self.handlers.auth.generate_delete_pusher_token(),
+ "app_id": app_id,
+ "pushkey": email_address,
+ }
+
+ # XXX: make r0 once API is stable
+ return "%s_matrix/client/unstable/pushers/remove?%s" % (
+ self.hs.config.public_baseurl,
+ urllib.urlencode(params),
+ )
def mxc_to_http_filter(self, value, width, height, resize_method="crop"):
if value[0:6] != "mxc://":
--
cgit 1.5.1
From f84b89f0c6b2e67897fec8639b79bf1d45c8f2b6 Mon Sep 17 00:00:00 2001
From: Matthew Hodgson
Date: Thu, 2 Jun 2016 13:29:48 +0100
Subject: if an email pusher specifies a brand param, use it
---
synapse/push/emailpusher.py | 7 ++++++-
synapse/push/mailer.py | 4 ++--
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index a72cba8306..e38ed02006 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -72,7 +72,12 @@ class EmailPusher(object):
self.processing = False
if self.hs.config.email_enable_notifs:
- self.mailer = Mailer(self.hs)
+ if 'data' in pusherdict and 'brand' in pusherdict['data']:
+ app_name = pusherdict['data']['brand']
+ else:
+ app_name = self.hs.config.email_app_name
+
+ self.mailer = Mailer(self.hs, app_name)
else:
self.mailer = None
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index fe5d67a03c..0e9d8ccb53 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -78,12 +78,12 @@ ALLOWED_ATTRS = {
class Mailer(object):
- def __init__(self, hs):
+ def __init__(self, hs, app_name):
self.hs = hs
self.store = self.hs.get_datastore()
self.state_handler = self.hs.get_state_handler()
loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
- self.app_name = self.hs.config.email_app_name
+ self.app_name = app_name
env = jinja2.Environment(loader=loader)
env.filters["format_ts"] = format_ts_filter
env.filters["mxc_to_http"] = self.mxc_to_http_filter
--
cgit 1.5.1
From 4a10510cd5aff790127a185ecefc83b881a717cc Mon Sep 17 00:00:00 2001
From: David Baker
Date: Thu, 2 Jun 2016 13:31:45 +0100
Subject: Split out the auth handler
---
synapse/handlers/__init__.py | 2 --
synapse/handlers/register.py | 2 +-
synapse/rest/client/v1/login.py | 11 ++++++-----
synapse/rest/client/v2_alpha/account.py | 4 ++--
synapse/rest/client/v2_alpha/auth.py | 2 +-
synapse/rest/client/v2_alpha/register.py | 2 +-
synapse/rest/client/v2_alpha/tokenrefresh.py | 2 +-
synapse/server.py | 5 +++++
tests/rest/client/v2_alpha/test_register.py | 2 +-
tests/utils.py | 15 +++++----------
10 files changed, 23 insertions(+), 24 deletions(-)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index c0069e23d6..d28e07f0d9 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -24,7 +24,6 @@ from .federation import FederationHandler
from .profile import ProfileHandler
from .directory import DirectoryHandler
from .admin import AdminHandler
-from .auth import AuthHandler
from .identity import IdentityHandler
from .receipts import ReceiptsHandler
from .search import SearchHandler
@@ -50,7 +49,6 @@ class Handlers(object):
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.receipts_handler = ReceiptsHandler(hs)
- self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 16f33f8371..bbc07b045e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -413,7 +413,7 @@ class RegistrationHandler(BaseHandler):
defer.returnValue((user_id, token))
def auth_handler(self):
- return self.hs.get_handlers().auth_handler
+ return self.hs.get_auth_handler()
@defer.inlineCallbacks
def guest_access_token_for(self, medium, address, inviter_user_id):
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 3b5544851b..8df9d10efa 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -58,6 +58,7 @@ class LoginRestServlet(ClientV1RestServlet):
self.cas_required_attributes = hs.config.cas_required_attributes
self.servername = hs.config.server_name
self.http_client = hs.get_simple_http_client()
+ self.auth_handler = self.hs.get_auth_handler()
def on_GET(self, request):
flows = []
@@ -143,7 +144,7 @@ class LoginRestServlet(ClientV1RestServlet):
user_id, self.hs.hostname
).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_id, access_token, refresh_token = yield auth_handler.login_with_password(
user_id=user_id,
password=login_submission["password"])
@@ -160,7 +161,7 @@ class LoginRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def do_token_login(self, login_submission):
token = login_submission['token']
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_id = (
yield auth_handler.validate_short_term_login_token_and_get_user_id(token)
)
@@ -194,7 +195,7 @@ class LoginRestServlet(ClientV1RestServlet):
raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
user_id = UserID.create(user, self.hs.hostname).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_exists = yield auth_handler.does_user_exist(user_id)
if user_exists:
user_id, access_token, refresh_token = (
@@ -243,7 +244,7 @@ class LoginRestServlet(ClientV1RestServlet):
raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
user_id = UserID.create(user, self.hs.hostname).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_exists = yield auth_handler.does_user_exist(user_id)
if user_exists:
user_id, access_token, refresh_token = (
@@ -412,7 +413,7 @@ class CasTicketServlet(ClientV1RestServlet):
raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
user_id = UserID.create(user, self.hs.hostname).to_string()
- auth_handler = self.handlers.auth_handler
+ auth_handler = self.auth_handler
user_exists = yield auth_handler.does_user_exist(user_id)
if not user_exists:
user_id, _ = (
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index c88c270537..9a84873a5f 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -35,7 +35,7 @@ class PasswordRestServlet(RestServlet):
super(PasswordRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
@defer.inlineCallbacks
def on_POST(self, request):
@@ -97,7 +97,7 @@ class ThreepidRestServlet(RestServlet):
self.hs = hs
self.identity_handler = hs.get_handlers().identity_handler
self.auth = hs.get_auth()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
@defer.inlineCallbacks
def on_GET(self, request):
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 78181b7b18..58d3cad6a1 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -104,7 +104,7 @@ class AuthRestServlet(RestServlet):
super(AuthRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
self.registration_handler = hs.get_handlers().registration_handler
@defer.inlineCallbacks
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 1ecc02d94d..2088c316d1 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -49,7 +49,7 @@ class RegisterRestServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
- self.auth_handler = hs.get_handlers().auth_handler
+ self.auth_handler = hs.get_auth_handler()
self.registration_handler = hs.get_handlers().registration_handler
self.identity_handler = hs.get_handlers().identity_handler
diff --git a/synapse/rest/client/v2_alpha/tokenrefresh.py b/synapse/rest/client/v2_alpha/tokenrefresh.py
index a158c2209a..8270e8787f 100644
--- a/synapse/rest/client/v2_alpha/tokenrefresh.py
+++ b/synapse/rest/client/v2_alpha/tokenrefresh.py
@@ -38,7 +38,7 @@ class TokenRefreshRestServlet(RestServlet):
body = parse_json_object_from_request(request)
try:
old_refresh_token = body["refresh_token"]
- auth_handler = self.hs.get_handlers().auth_handler
+ auth_handler = self.hs.get_auth_handler()
(user_id, new_refresh_token) = yield self.store.exchange_refresh_token(
old_refresh_token, auth_handler.generate_refresh_token)
new_access_token = yield auth_handler.issue_access_token(user_id)
diff --git a/synapse/server.py b/synapse/server.py
index 7cf22b1eea..dd4b81c658 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -33,6 +33,7 @@ from synapse.handlers.presence import PresenceHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
from synapse.handlers.room import RoomListHandler
+from synapse.handlers.auth import AuthHandler
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.state import StateHandler
from synapse.storage import DataStore
@@ -89,6 +90,7 @@ class HomeServer(object):
'sync_handler',
'typing_handler',
'room_list_handler',
+ 'auth_handler',
'application_service_api',
'application_service_scheduler',
'application_service_handler',
@@ -190,6 +192,9 @@ class HomeServer(object):
def build_room_list_handler(self):
return RoomListHandler(self)
+ def build_auth_handler(self):
+ return AuthHandler(self)
+
def build_application_service_api(self):
return ApplicationServiceApi(self)
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index affd42c015..cda0a2b27c 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -33,7 +33,6 @@ class RegisterRestServletTestCase(unittest.TestCase):
# do the dance to hook it up to the hs global
self.handlers = Mock(
- auth_handler=self.auth_handler,
registration_handler=self.registration_handler,
identity_handler=self.identity_handler,
login_handler=self.login_handler
@@ -42,6 +41,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.hs.hostname = "superbig~testing~thing.com"
self.hs.get_auth = Mock(return_value=self.auth)
self.hs.get_handlers = Mock(return_value=self.handlers)
+ self.hs.get_auth_handler = Mock(return_value=self.auth_handler)
self.hs.config.enable_registration = True
# init the thing we're testing
diff --git a/tests/utils.py b/tests/utils.py
index 006abedbc1..e19ae581e0 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -81,16 +81,11 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
)
# bcrypt is far too slow to be doing in unit tests
- def swap_out_hash_for_testing(old_build_handlers):
- def build_handlers():
- handlers = old_build_handlers()
- auth_handler = handlers.auth_handler
- auth_handler.hash = lambda p: hashlib.md5(p).hexdigest()
- auth_handler.validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h
- return handlers
- return build_handlers
-
- hs.build_handlers = swap_out_hash_for_testing(hs.build_handlers)
+ # Need to let the HS build an auth handler and then mess with it
+ # because AuthHandler's constructor requires the HS, so we can't make one
+ # beforehand and pass it in to the HS's constructor (chicken / egg)
+ hs.get_auth_handler().hash = lambda p: hashlib.md5(p).hexdigest()
+ hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h
fed = kargs.get("resource_for_federation", None)
if fed:
--
cgit 1.5.1
From 356f13c0696526032c211c103dad2f57d18473fa Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 2 Jun 2016 14:07:38 +0100
Subject: Disable INCLUDE_ALL_UNREAD_NOTIFS
---
synapse/push/emailpusher.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index e38ed02006..2c21ed3088 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -44,7 +44,8 @@ THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
# does each email include all unread notifs, or just the ones which have happened
# since the last mail?
-INCLUDE_ALL_UNREAD_NOTIFS = True
+# XXX: this is currently broken as it includes ones from parted rooms(!)
+INCLUDE_ALL_UNREAD_NOTIFS = False
class EmailPusher(object):
--
cgit 1.5.1
From 70599ce9252997d32d0bf9f26a4e02c99bbe474d Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Thu, 2 Jun 2016 15:20:15 +0100
Subject: Allow external processes to mark a user as syncing. (#812)
* Add infrastructure to the presence handler to track sync requests in external processes
* Expire stale entries for dead external processes
* Add an http endpoint for making users as syncing
Add some docstrings and comments.
* Fixes
---
synapse/handlers/presence.py | 119 +++++++++++++++++++++++++++----
synapse/replication/presence_resource.py | 59 +++++++++++++++
synapse/replication/resource.py | 2 +
tests/handlers/test_presence.py | 16 ++---
4 files changed, 174 insertions(+), 22 deletions(-)
create mode 100644 synapse/replication/presence_resource.py
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37f57301fb..fc8538b41e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -68,6 +68,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
# How often to resend presence to remote servers
FEDERATION_PING_INTERVAL = 25 * 60 * 1000
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
@@ -158,10 +162,21 @@ class PresenceHandler(object):
self.serial_to_user = {}
self._next_serial = 1
- # Keeps track of the number of *ongoing* syncs. While this is non zero
- # a user will never go offline.
+ # Keeps track of the number of *ongoing* syncs on this process. While
+ # this is non zero a user will never go offline.
self.user_to_num_current_syncs = {}
+ # Keeps track of the number of *ongoing* syncs on other processes.
+ # While any sync is ongoing on another process the user will never
+ # go offline.
+ # Each process has a unique identifier and an update frequency. If
+ # no update is received from that process within the update period then
+ # we assume that all the sync requests on that process have stopped.
+ # Stored as a dict from process_id to set of user_id, and a dict of
+ # process_id to millisecond timestamp last updated.
+ self.external_process_to_current_syncs = {}
+ self.external_process_last_updated_ms = {}
+
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
@@ -272,13 +287,26 @@ class PresenceHandler(object):
# Fetch the list of users that *may* have timed out. Things may have
# changed since the timeout was set, so we won't necessarily have to
# take any action.
- users_to_check = self.wheel_timer.fetch(now)
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_update.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_to_current_syncs.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
states = [
self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id)
)
- for user_id in set(users_to_check)
+ for user_id in users_to_check
]
timers_fired_counter.inc_by(len(states))
@@ -286,7 +314,7 @@ class PresenceHandler(object):
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
- user_to_num_current_syncs=self.user_to_num_current_syncs,
+ syncing_users=self.get_syncing_users(),
now=now,
)
@@ -363,6 +391,73 @@ class PresenceHandler(object):
defer.returnValue(_user_syncing())
+ def get_currently_syncing_users(self):
+ """Get the set of user ids that are currently syncing on this HS.
+ Returns:
+ set(str): A set of user_id strings.
+ """
+ syncing_user_ids = {
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count
+ }
+ syncing_user_ids.update(self.external_process_to_current_syncs.values())
+ return syncing_user_ids
+
+ @defer.inlineCallbacks
+ def update_external_syncs(self, process_id, syncing_user_ids):
+ """Update the syncing users for an external process
+
+ Args:
+ process_id(str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ syncing_user_ids(set(str)): The set of user_ids that are
+ currently syncing on that server.
+ """
+
+ # Grab the previous list of user_ids that were syncing on that process
+ prev_syncing_user_ids = (
+ self.external_process_to_current_syncs.get(process_id, set())
+ )
+ # Grab the current presence state for both the users that are syncing
+ # now and the users that were syncing before this update.
+ prev_states = yield self.current_state_for_users(
+ syncing_user_ids | prev_syncing_user_ids
+ )
+ updates = []
+ time_now_ms = self.clock.time_msec()
+
+ # For each new user that is syncing check if we need to mark them as
+ # being online.
+ for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+ prev_state = prev_states[new_user_id]
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=time_now_ms,
+ last_user_sync_ts=time_now_ms,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ # For each user that is still syncing or stopped syncing update the
+ # last sync time so that we will correctly apply the grace period when
+ # they stop syncing.
+ for old_user_id in prev_syncing_user_ids:
+ prev_state = prev_states[old_user_id]
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ yield self._update_states(updates)
+
+ # Update the last updated time for the process. We expire the entries
+ # if we don't receive an update in the given timeframe.
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+ self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
@defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
@@ -935,15 +1030,14 @@ class PresenceEventSource(object):
return self.get_new_events(user, from_key=None, include_offline=False)
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
appropriate.
Args:
user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -954,21 +1048,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
for state in user_states:
is_mine = is_mine_fn(state.user_id)
- new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+ new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
if new_state:
changes[state.user_id] = new_state
return changes.values()
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state (UserPresenceState)
is_mine (bool): Whether the user is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -1002,7 +1095,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
- if not user_to_num_current_syncs.get(user_id, 0):
+ if user_id not in syncing_user_ids:
if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py
new file mode 100644
index 0000000000..fc18130ab4
--- /dev/null
+++ b/synapse/replication/presence_resource.py
@@ -0,0 +1,59 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import defer
+
+
+class PresenceResource(Resource):
+ """
+ HTTP endpoint for marking users as syncing.
+
+ POST /_synapse/replication/presence HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "process_id": "",
+ "syncing_users": [""]
+ }
+ """
+
+ def __init__(self, hs):
+ Resource.__init__(self) # Resource is old-style, so no super()
+
+ self.version_string = hs.version_string
+ self.clock = hs.get_clock()
+ self.presence_handler = hs.get_presence_handler()
+
+ def render_POST(self, request):
+ self._async_render_POST(request)
+ return NOT_DONE_YET
+
+ @request_handler()
+ @defer.inlineCallbacks
+ def _async_render_POST(self, request):
+ content = parse_json_object_from_request(request)
+
+ process_id = content["process_id"]
+ syncing_user_ids = content["syncing_users"]
+
+ yield self.presence_handler.update_external_syncs(
+ process_id, set(syncing_user_ids)
+ )
+
+ respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 847f212a3d..8c2d487ff4 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -16,6 +16,7 @@
from synapse.http.servlet import parse_integer, parse_string
from synapse.http.server import request_handler, finish_request
from synapse.replication.pusher_resource import PusherResource
+from synapse.replication.presence_resource import PresenceResource
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
@@ -115,6 +116,7 @@ class ReplicationResource(Resource):
self.clock = hs.get_clock()
self.putChild("remove_pushers", PusherResource(hs))
+ self.putChild("syncing_users", PresenceResource(hs))
def render_GET(self, request):
self._async_render_GET(request)
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 87c795fcfa..b531ba8540 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={
- user_id: 1,
- }, now=now
+ state, is_mine=True, syncing_user_ids=set([user_id]), now=now
)
self.assertIsNotNone(new_state)
@@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNone(new_state)
@@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=False, user_to_num_current_syncs={}, now=now
+ state, is_mine=False, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
@@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
)
new_state = handle_timeout(
- state, is_mine=True, user_to_num_current_syncs={}, now=now
+ state, is_mine=True, syncing_user_ids=set(), now=now
)
self.assertIsNotNone(new_state)
--
cgit 1.5.1
From 661a540dd1de89a3ab3a8f6ca0f780ea7d264176 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Thu, 2 Jun 2016 15:20:28 +0100
Subject: Deduplicate presence entries in sync (#818)
---
synapse/handlers/sync.py | 3 +++
1 file changed, 3 insertions(+)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3b89582d79..5307b62b85 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -637,6 +637,9 @@ class SyncHandler(object):
)
presence.extend(states)
+ # Deduplicate the presence entries so that there's at most one per user
+ presence = {p["content"]["user_id"]: p for p in presence}.values()
+
presence = sync_config.filter_collection.filter_presence(
presence
)
--
cgit 1.5.1
From 80f34d7b574e1a6b8bc922df41bd53b59260fcf2 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Thu, 2 Jun 2016 15:23:09 +0100
Subject: Fix setting the _clock in SQLBaseStore
---
synapse/storage/_base.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 56a0dd80f3..32c6677d47 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -152,6 +152,7 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
+ self._clock = hs.get_clock()
self._db_pool = hs.get_db_pool()
self._previous_txn_total_time = 0
--
cgit 1.5.1
From 56d15a05306896555ded3025f8a808bda04872fa Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Thu, 2 Jun 2016 16:28:54 +0100
Subject: Store the typing users as user_id strings. (#819)
Rather than storing them as UserID objects.
---
synapse/handlers/typing.py | 64 ++++++++++++++++++++++++-------------------
tests/handlers/test_typing.py | 4 +--
2 files changed, 38 insertions(+), 30 deletions(-)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d46f05f426..3c54307bed 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
# A tiny object useful for storing a user's membership in a room, as a mapping
# key
-RoomMember = namedtuple("RoomMember", ("room_id", "user"))
+RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
class TypingHandler(object):
@@ -38,7 +38,7 @@ class TypingHandler(object):
self.store = hs.get_datastore()
self.server_name = hs.config.server_name
self.auth = hs.get_auth()
- self.is_mine = hs.is_mine
+ self.is_mine_id = hs.is_mine_id
self.notifier = hs.get_notifier()
self.clock = hs.get_clock()
@@ -67,20 +67,23 @@ class TypingHandler(object):
@defer.inlineCallbacks
def started_typing(self, target_user, auth_user, room_id, timeout):
- if not self.is_mine(target_user):
+ target_user_id = target_user.to_string()
+ auth_user_id = auth_user.to_string()
+
+ if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != auth_user:
+ if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_joined_room(room_id, target_user.to_string())
+ yield self.auth.check_joined_room(room_id, target_user_id)
logger.debug(
- "%s has started typing in %s", target_user.to_string(), room_id
+ "%s has started typing in %s", target_user_id, room_id
)
until = self.clock.time_msec() + timeout
- member = RoomMember(room_id=room_id, user=target_user)
+ member = RoomMember(room_id=room_id, user_id=target_user_id)
was_present = member in self._member_typing_until
@@ -104,25 +107,28 @@ class TypingHandler(object):
yield self._push_update(
room_id=room_id,
- user=target_user,
+ user_id=target_user_id,
typing=True,
)
@defer.inlineCallbacks
def stopped_typing(self, target_user, auth_user, room_id):
- if not self.is_mine(target_user):
+ target_user_id = target_user.to_string()
+ auth_user_id = auth_user.to_string()
+
+ if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != auth_user:
+ if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_joined_room(room_id, target_user.to_string())
+ yield self.auth.check_joined_room(room_id, target_user_id)
logger.debug(
- "%s has stopped typing in %s", target_user.to_string(), room_id
+ "%s has stopped typing in %s", target_user_id, room_id
)
- member = RoomMember(room_id=room_id, user=target_user)
+ member = RoomMember(room_id=room_id, user_id=target_user_id)
if member in self._member_typing_timer:
self.clock.cancel_call_later(self._member_typing_timer[member])
@@ -132,8 +138,9 @@ class TypingHandler(object):
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
- if self.is_mine(user):
- member = RoomMember(room_id=room_id, user=user)
+ user_id = user.to_string()
+ if self.is_mine_id(user_id):
+ member = RoomMember(room_id=room_id, user=user_id)
yield self._stopped_typing(member)
@defer.inlineCallbacks
@@ -144,7 +151,7 @@ class TypingHandler(object):
yield self._push_update(
room_id=member.room_id,
- user=member.user,
+ user_id=member.user_id,
typing=False,
)
@@ -156,7 +163,7 @@ class TypingHandler(object):
del self._member_typing_timer[member]
@defer.inlineCallbacks
- def _push_update(self, room_id, user, typing):
+ def _push_update(self, room_id, user_id, typing):
domains = yield self.store.get_joined_hosts_for_room(room_id)
deferreds = []
@@ -164,7 +171,7 @@ class TypingHandler(object):
if domain == self.server_name:
self._push_update_local(
room_id=room_id,
- user=user,
+ user_id=user_id,
typing=typing
)
else:
@@ -173,7 +180,7 @@ class TypingHandler(object):
edu_type="m.typing",
content={
"room_id": room_id,
- "user_id": user.to_string(),
+ "user_id": user_id,
"typing": typing,
},
))
@@ -183,23 +190,26 @@ class TypingHandler(object):
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
room_id = content["room_id"]
- user = UserID.from_string(content["user_id"])
+ user_id = content["user_id"]
+
+ # Check that the string is a valid user id
+ UserID.from_string(user_id)
domains = yield self.store.get_joined_hosts_for_room(room_id)
if self.server_name in domains:
self._push_update_local(
room_id=room_id,
- user=user,
+ user_id=user_id,
typing=content["typing"]
)
- def _push_update_local(self, room_id, user, typing):
+ def _push_update_local(self, room_id, user_id, typing):
room_set = self._room_typing.setdefault(room_id, set())
if typing:
- room_set.add(user)
+ room_set.add(user_id)
else:
- room_set.discard(user)
+ room_set.discard(user_id)
self._latest_room_serial += 1
self._room_serials[room_id] = self._latest_room_serial
@@ -215,9 +225,7 @@ class TypingHandler(object):
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
typing = self._room_typing[room_id]
- typing_bytes = json.dumps([
- u.to_string() for u in typing
- ], ensure_ascii=False)
+ typing_bytes = json.dumps(list(typing), ensure_ascii=False)
rows.append((serial, room_id, typing_bytes))
rows.sort()
return rows
@@ -239,7 +247,7 @@ class TypingNotificationEventSource(object):
"type": "m.typing",
"room_id": room_id,
"content": {
- "user_ids": [u.to_string() for u in typing],
+ "user_ids": list(typing),
},
}
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index abb739ae52..ab9899b7d5 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -251,12 +251,12 @@ class TypingNotificationsTestCase(unittest.TestCase):
# Gut-wrenching
from synapse.handlers.typing import RoomMember
- member = RoomMember(self.room_id, self.u_apple)
+ member = RoomMember(self.room_id, self.u_apple.to_string())
self.handler._member_typing_until[member] = 1002000
self.handler._member_typing_timer[member] = (
self.clock.call_later(1002, lambda: 0)
)
- self.handler._room_typing[self.room_id] = set((self.u_apple,))
+ self.handler._room_typing[self.room_id] = set((self.u_apple.to_string(),))
self.assertEquals(self.event_source.get_current_key(), 0)
--
cgit 1.5.1
From 07a555991600137c830eb3b06f90a305c8f1e3d8 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Thu, 2 Jun 2016 17:17:16 +0100
Subject: Fix error in email notification string formatting
---
synapse/push/mailer.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 0e9d8ccb53..63c7ec18a5 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -41,7 +41,7 @@ logger = logging.getLogger(__name__)
MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \
- "in the %s room..."
+ "in the %(room)s room..."
MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
MESSAGES_IN_ROOM = "You have messages on %(app)s in the %(room)s room..."
--
cgit 1.5.1
From 2675c1e40ebc3392ce719ac2304b97e98c7fefb4 Mon Sep 17 00:00:00 2001
From: Matthew Hodgson
Date: Thu, 2 Jun 2016 17:21:12 +0100
Subject: add some branding debugging
---
synapse/push/mailer.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 0e9d8ccb53..944f3b481d 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -84,6 +84,7 @@ class Mailer(object):
self.state_handler = self.hs.get_state_handler()
loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
self.app_name = app_name
+ logger.info("Created Mailer for app_name %s" % app_name)
env = jinja2.Environment(loader=loader)
env.filters["format_ts"] = format_ts_filter
env.filters["mxc_to_http"] = self.mxc_to_http_filter
--
cgit 1.5.1
From 1f31cc37f8611f9ae5612ef5be82e63735fbdf34 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Thu, 2 Jun 2016 17:21:31 +0100
Subject: Working unsubscribe links going straight to the HS
and authed by macaroons that let you delete pushers and nothing else
---
synapse/api/auth.py | 7 +++++++
synapse/app/pusher.py | 23 ++++++++++++++++++++++-
synapse/push/mailer.py | 8 ++++----
synapse/rest/client/v1/pusher.py | 4 +++-
4 files changed, 36 insertions(+), 6 deletions(-)
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 463bd8b692..31e1abb964 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -660,6 +660,13 @@ class Auth(object):
"is_guest": True,
"token_id": None,
}
+ elif rights == "delete_pusher":
+ # We don't store these tokens in the database
+ ret = {
+ "user": user,
+ "is_guest": False,
+ "token_id": None,
+ }
else:
# This codepath exists so that we can actually return a
# token ID, because we use token IDs in place of device
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 135dd58c15..f1de1e7ce9 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -21,6 +21,7 @@ from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
from synapse.config.emailconfig import EmailConfig
+from synapse.config.key import KeyConfig
from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.storage.roommember import RoomMemberStore
@@ -63,6 +64,26 @@ class SlaveConfig(DatabaseConfig):
self.pid_file = self.abspath(config.get("pid_file"))
self.public_baseurl = config["public_baseurl"]
+ # some things used by the auth handler but not actually used in the
+ # pusher codebase
+ self.bcrypt_rounds = None
+ self.ldap_enabled = None
+ self.ldap_server = None
+ self.ldap_port = None
+ self.ldap_tls = None
+ self.ldap_search_base = None
+ self.ldap_search_property = None
+ self.ldap_email_property = None
+ self.ldap_full_name_property = None
+
+ # We would otherwise try to use the registration shared secret as the
+ # macaroon shared secret if there was no macaroon_shared_secret, but
+ # that means pulling in RegistrationConfig too. We don't need to be
+ # backwards compaitible in the pusher codebase so just make people set
+ # macaroon_shared_secret. We set this to None to prevent it referencing
+ # an undefined key.
+ self.registration_shared_secret = None
+
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("pusher.pid")
return """\
@@ -95,7 +116,7 @@ class SlaveConfig(DatabaseConfig):
""" % locals()
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
+class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
pass
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index e877d8fdad..60d3700afa 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -81,7 +81,7 @@ class Mailer(object):
def __init__(self, hs, app_name):
self.hs = hs
self.store = self.hs.get_datastore()
- self.handlers = self.hs.get_handlers()
+ self.auth_handler = self.hs.get_auth_handler()
self.state_handler = self.hs.get_state_handler()
loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
self.app_name = app_name
@@ -161,7 +161,7 @@ class Mailer(object):
template_vars = {
"user_display_name": user_display_name,
- "unsubscribe_link": self.make_unsubscribe_link(app_id, email_address),
+ "unsubscribe_link": self.make_unsubscribe_link(user_id, app_id, email_address),
"summary_text": summary_text,
"app_name": self.app_name,
"rooms": rooms,
@@ -427,9 +427,9 @@ class Mailer(object):
notif['room_id'], notif['event_id']
)
- def make_unsubscribe_link(self, app_id, email_address):
+ def make_unsubscribe_link(self, user_id, app_id, email_address):
params = {
- "access_token": self.handlers.auth.generate_delete_pusher_token(),
+ "access_token": self.auth_handler.generate_delete_pusher_token(user_id),
"app_id": app_id,
"pushkey": email_address,
}
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index fa7a0992dd..9a2ed6ed88 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -149,11 +149,13 @@ class PushersRemoveRestServlet(RestServlet):
def __init__(self, hs):
super(RestServlet, self).__init__()
+ self.hs = hs
self.notifier = hs.get_notifier()
+ self.auth = hs.get_v1auth()
@defer.inlineCallbacks
def on_GET(self, request):
- requester = yield self.auth.get_user_by_req(request, "delete_pusher")
+ requester = yield self.auth.get_user_by_req(request, rights="delete_pusher")
user = requester.user
app_id = parse_string(request, "app_id", required=True)
--
cgit 1.5.1
From 745ddb4dd04d0346f27f65a1e5508900b58e658a Mon Sep 17 00:00:00 2001
From: David Baker
Date: Thu, 2 Jun 2016 17:38:41 +0100
Subject: peppate
---
synapse/push/mailer.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 60d3700afa..3c9a66008d 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -161,7 +161,9 @@ class Mailer(object):
template_vars = {
"user_display_name": user_display_name,
- "unsubscribe_link": self.make_unsubscribe_link(user_id, app_id, email_address),
+ "unsubscribe_link": self.make_unsubscribe_link(
+ user_id, app_id, email_address
+ ),
"summary_text": summary_text,
"app_name": self.app_name,
"rooms": rooms,
--
cgit 1.5.1
From 79d1f072f4fc9a6b2e9773e8cb700b26bc2dff51 Mon Sep 17 00:00:00 2001
From: Matthew Hodgson
Date: Thu, 2 Jun 2016 21:34:40 +0100
Subject: brand the email from header
---
synapse/config/emailconfig.py | 2 +-
synapse/push/mailer.py | 9 ++++++++-
2 files changed, 9 insertions(+), 2 deletions(-)
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 90bdd08f00..a187161272 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -89,7 +89,7 @@ class EmailConfig(Config):
# enable_notifs: false
# smtp_host: "localhost"
# smtp_port: 25
- # notif_from: Your Friendly Matrix Home Server
+ # notif_from: "Your Friendly %(app)s Home Server "
# app_name: Matrix
# template_dir: res/templates
# notif_template_html: notif_mail.html
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 944f3b481d..c1e9057eb6 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -97,7 +97,14 @@ class Mailer(object):
@defer.inlineCallbacks
def send_notification_mail(self, user_id, email_address, push_actions, reason):
- raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1]
+ try:
+ from_string = self.hs.config.email_notif_from % {
+ "app": self.app_name
+ }
+ except TypeError:
+ from_string = self.hs.config.email_notif_from
+
+ raw_from = email.utils.parseaddr(from_string)[1]
raw_to = email.utils.parseaddr(email_address)[1]
if raw_to == '':
--
cgit 1.5.1
From 0eae0757232169b833224f48208aed9fdc9c6fe6 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 10:58:03 +0100
Subject: Add slaved stores for filters, tokens, and push rules
---
synapse/replication/slave/storage/appservice.py | 30 ++++++++++
synapse/replication/slave/storage/filtering.py | 24 ++++++++
synapse/replication/slave/storage/push_rule.py | 67 +++++++++++++++++++++++
synapse/replication/slave/storage/registration.py | 30 ++++++++++
4 files changed, 151 insertions(+)
create mode 100644 synapse/replication/slave/storage/appservice.py
create mode 100644 synapse/replication/slave/storage/filtering.py
create mode 100644 synapse/replication/slave/storage/push_rule.py
create mode 100644 synapse/replication/slave/storage/registration.py
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
new file mode 100644
index 0000000000..25792d9429
--- /dev/null
+++ b/synapse/replication/slave/storage/appservice.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.config.appservice import load_appservices
+
+
+class SlavedApplicationServiceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
+ self.services_cache = load_appservices(
+ hs.config.server_name,
+ hs.config.app_service_config_files
+ )
+
+ get_app_service_by_token = DataStore.get_app_service_by_token.__func__
+ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
new file mode 100644
index 0000000000..5037f395b9
--- /dev/null
+++ b/synapse/replication/slave/storage/filtering.py
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.filtering import FilteringStore
+
+
+class SlavedFilteringStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedFilteringStore, self).__init__(db_conn, hs)
+
+ get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
new file mode 100644
index 0000000000..21ceb0213a
--- /dev/null
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .events import SlavedEventStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.storage.push_rule import PushRuleStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedPushRuleStore(SlavedEventStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPushRuleStore, self).__init__(db_conn, hs)
+ self._push_rules_stream_id_gen = SlavedIdTracker(
+ db_conn, "push_rules_stream", "stream_id",
+ )
+ self.push_rules_stream_cache = StreamChangeCache(
+ "PushRulesStreamChangeCache",
+ self._push_rules_stream_id_gen.get_current_token(),
+ )
+
+ get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
+ get_push_rules_enabled_for_user = (
+ PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
+ )
+ have_push_rules_changed_for_user = (
+ DataStore.have_push_rules_changed_for_user.__func__
+ )
+
+ def get_push_rules_stream_token(self):
+ return (
+ self._push_rules_stream_id_gen.get_current_token(),
+ self._stream_id_gen.get_current_token(),
+ )
+
+ def stream_positions(self):
+ result = super(SlavedPushRuleStore, self).stream_positions()
+ result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("push_rules")
+ if stream:
+ for row in stream["rows"]:
+ position = row[0]
+ user_id = row[2]
+ self.get_push_rules_for_user.invalidate((user_id,))
+ self.get_push_rules_enabled_for_user.invalidate((user_id,))
+ self.push_rules_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ self._push_rules_stream_id_gen.advance(int(stream["position"]))
+
+ return super(SlavedPushRuleStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
new file mode 100644
index 0000000000..307833f9e1
--- /dev/null
+++ b/synapse/replication/slave/storage/registration.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.registration import RegistrationStore
+
+
+class SlavedRegistrationStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedRegistrationStore, self).__init__(db_conn, hs)
+
+ # TODO: use the cached version and invalidate deleted tokens
+ get_user_by_access_token = RegistrationStore.__dict__[
+ "get_user_by_access_token"
+ ].orig
+
+ _query_for_auth = DataStore._query_for_auth.__func__
--
cgit 1.5.1
From f88d747f7959808884451245aeba65edf7c490bf Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 11:03:10 +0100
Subject: Add a comment explaining why the filter cache doesn't need exipiring
---
synapse/replication/slave/storage/filtering.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 5037f395b9..819ed62881 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -21,4 +21,5 @@ class SlavedFilteringStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedFilteringStore, self).__init__(db_conn, hs)
+ # Filters are immutable so this cache doesn't need to be expired
get_user_filter = FilteringStore.__dict__["get_user_filter"]
--
cgit 1.5.1
From 9c26b390a2112590fe4252057dc1f081cb99a6b1 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 1 Jun 2016 11:34:06 +0100
Subject: Only get local users
---
synapse/push/bulk_push_rule_evaluator.py | 7 +++++--
synapse/storage/pusher.py | 2 +-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 8c59e59e03..d50db3b736 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -83,10 +83,13 @@ def evaluator_for_event(event, hs, store, current_state):
e.state_key for e in current_state.values()
if e.type == EventTypes.Member and e.membership == Membership.JOIN
)
+ local_users_in_room = set(uid for uid in all_in_room if hs.is_mine_id(uid))
# users in the room who have pushers need to get push rules run because
# that's how their pushers work
- if_users_with_pushers = yield store.get_if_users_have_pushers(all_in_room)
+ if_users_with_pushers = yield store.get_if_users_have_pushers(
+ local_users_in_room
+ )
users_with_pushers = set(
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
)
@@ -96,7 +99,7 @@ def evaluator_for_event(event, hs, store, current_state):
# any users with pushers must be ours: they have pushers
user_ids = set(users_with_pushers)
for uid in users_with_receipts:
- if hs.is_mine_id(uid) and uid in all_in_room:
+ if uid in local_users_in_room:
user_ids.add(uid)
# if this event is an invite event, we may need to run rules for the user
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 39d5349eaa..a7d7c54d7e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -135,7 +135,7 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
- @cachedInlineCallbacks(lru=True, num_args=1)
+ @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id):
result = yield self._simple_select_many_batch(
table='pushers',
--
cgit 1.5.1
From 59f2d7352224af97e8f091f673858dde42b00197 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 1 Jun 2016 15:45:37 +0100
Subject: Remove unnecessary sets
---
synapse/push/bulk_push_rule_evaluator.py | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index d50db3b736..af5212a5d1 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -79,25 +79,24 @@ def evaluator_for_event(event, hs, store, current_state):
# generating them for bot / AS users etc, we only do so for people who've
# sent a read receipt into the room.
- all_in_room = set(
+ local_users_in_room = set(
e.state_key for e in current_state.values()
if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ and hs.is_mine_id(e.state_key)
)
- local_users_in_room = set(uid for uid in all_in_room if hs.is_mine_id(uid))
# users in the room who have pushers need to get push rules run because
# that's how their pushers work
if_users_with_pushers = yield store.get_if_users_have_pushers(
local_users_in_room
)
- users_with_pushers = set(
+ user_ids = set(
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
)
users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
# any users with pushers must be ours: they have pushers
- user_ids = set(users_with_pushers)
for uid in users_with_receipts:
if uid in local_users_in_room:
user_ids.add(uid)
@@ -111,8 +110,6 @@ def evaluator_for_event(event, hs, store, current_state):
if has_pusher:
user_ids.add(invited_user)
- user_ids = list(user_ids)
-
rules_by_user = yield _get_rules(room_id, user_ids, store)
defer.returnValue(BulkPushRuleEvaluator(
--
cgit 1.5.1
From 3ae915b27e4531031ee325931b3c62bc200ce798 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 11:05:53 +0100
Subject: Add a slaved store for presence
---
synapse/replication/slave/storage/presence.py | 59 +++++++++++++++++++++++++++
synapse/storage/__init__.py | 6 +--
2 files changed, 62 insertions(+), 3 deletions(-)
create mode 100644 synapse/replication/slave/storage/presence.py
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
new file mode 100644
index 0000000000..703f4a49bf
--- /dev/null
+++ b/synapse/replication/slave/storage/presence.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage import DataStore
+
+
+class SlavedPresenceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPresenceStore, self).__init__(db_conn, hs)
+ self._presence_id_gen = SlavedIdTracker(
+ db_conn, "presence_stream", "stream_id",
+ )
+
+ self._presence_on_startup = self._get_active_presence(db_conn)
+
+ self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
+ "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
+ )
+
+ _get_active_presence = DataStore._get_active_presence.__func__
+ take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+ get_presence_for_users = DataStore.get_presence_for_users.__func__
+
+ def get_current_presence_token(self):
+ return self._presence_id_gen.get_current_token()
+
+ def stream_positions(self):
+ result = super(SlavedPresenceStore, self).stream_positions()
+ position = self._presence_id_gen.get_current_token()
+ result["presence"] = position
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("presence")
+ if stream:
+ self._presence_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.presence_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8581796b7e..6928a213e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -149,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore,
"AccountDataAndTagsChangeCache", account_max,
)
- self.__presence_on_startup = self._get_active_presence(db_conn)
+ self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict(
db_conn, "presence_stream",
@@ -190,8 +190,8 @@ class DataStore(RoomMemberStore, RoomStore,
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):
- active_on_startup = self.__presence_on_startup
- self.__presence_on_startup = None
+ active_on_startup = self._presence_on_startup
+ self._presence_on_startup = None
return active_on_startup
def _get_active_presence(self, db_conn):
--
cgit 1.5.1
From 6a0afa582aa5bf816e082af31ac44e2a8fee28c0 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 1 Jun 2016 14:27:07 +0100
Subject: Load push rules in storage layer, so that they get cached
---
synapse/handlers/sync.py | 5 ++---
synapse/push/bulk_push_rule_evaluator.py | 28 -----------------------
synapse/push/clientformat.py | 30 ++++++++++++++++++-------
synapse/rest/client/v1/push_rule.py | 6 ++---
synapse/storage/push_rule.py | 38 +++++++++++++++++++++++++++++++-
5 files changed, 63 insertions(+), 44 deletions(-)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5307b62b85..be26a491ff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -198,9 +198,8 @@ class SyncHandler(object):
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
- rawrules = yield self.store.get_push_rules_for_user(user_id)
- enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
- rules = format_push_rules_for_user(user, rawrules, enabled_map)
+ rules = yield self.store.get_push_rules_for_user(user_id)
+ rules = format_push_rules_for_user(user, rules)
defer.returnValue(rules)
@defer.inlineCallbacks
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index af5212a5d1..6e42121b1d 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -18,7 +18,6 @@ import ujson as json
from twisted.internet import defer
-from .baserules import list_with_base_rules
from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.api.constants import EventTypes, Membership
@@ -38,36 +37,9 @@ def decode_rule_json(rule):
@defer.inlineCallbacks
def _get_rules(room_id, user_ids, store):
rules_by_user = yield store.bulk_get_push_rules(user_ids)
- rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
- rules_by_user = {
- uid: list_with_base_rules([
- decode_rule_json(rule_list)
- for rule_list in rules_by_user.get(uid, [])
- ])
- for uid in user_ids
- }
-
- # We apply the rules-enabled map here: bulk_get_push_rules doesn't
- # fetch disabled rules, but this won't account for any server default
- # rules the user has disabled, so we need to do this too.
- for uid in user_ids:
- user_enabled_map = rules_enabled_by_user.get(uid)
- if not user_enabled_map:
- continue
-
- for i, rule in enumerate(rules_by_user[uid]):
- rule_id = rule['rule_id']
-
- if rule_id in user_enabled_map:
- if rule.get('enabled', True) != bool(user_enabled_map[rule_id]):
- # Rules are cached across users.
- rule = dict(rule)
- rule['enabled'] = bool(user_enabled_map[rule_id])
- rules_by_user[uid][i] = rule
-
defer.returnValue(rules_by_user)
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index ae9db9ec2f..b3983f7940 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -23,10 +23,7 @@ import copy
import simplejson as json
-def format_push_rules_for_user(user, rawrules, enabled_map):
- """Converts a list of rawrules and a enabled map into nested dictionaries
- to match the Matrix client-server format for push rules"""
-
+def load_rules_for_user(user, rawrules, enabled_map):
ruleslist = []
for rawrule in rawrules:
rule = dict(rawrule)
@@ -35,7 +32,26 @@ def format_push_rules_for_user(user, rawrules, enabled_map):
ruleslist.append(rule)
# We're going to be mutating this a lot, so do a deep copy
- ruleslist = copy.deepcopy(list_with_base_rules(ruleslist))
+ rules = list(list_with_base_rules(ruleslist))
+
+ for i, rule in enumerate(rules):
+ rule_id = rule['rule_id']
+ if rule_id in enabled_map:
+ if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+ # Rules are cached across users.
+ rule = dict(rule)
+ rule['enabled'] = bool(enabled_map[rule_id])
+ rules[i] = rule
+
+ return rules
+
+
+def format_push_rules_for_user(user, ruleslist):
+ """Converts a list of rawrules and a enabled map into nested dictionaries
+ to match the Matrix client-server format for push rules"""
+
+ # We're going to be mutating this a lot, so do a deep copy
+ ruleslist = copy.deepcopy(ruleslist)
rules = {'global': {}, 'device': {}}
@@ -60,9 +76,7 @@ def format_push_rules_for_user(user, rawrules, enabled_map):
template_rule = _rule_to_template(r)
if template_rule:
- if r['rule_id'] in enabled_map:
- template_rule['enabled'] = enabled_map[r['rule_id']]
- elif 'enabled' in r:
+ if 'enabled' in r:
template_rule['enabled'] = r['enabled']
else:
template_rule['enabled'] = True
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index 02d837ee6a..6bb4821ec6 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -128,11 +128,9 @@ class PushRuleRestServlet(ClientV1RestServlet):
# we build up the full structure and then decide which bits of it
# to send which means doing unnecessary work sometimes but is
# is probably not going to make a whole lot of difference
- rawrules = yield self.store.get_push_rules_for_user(user_id)
+ rules = yield self.store.get_push_rules_for_user(user_id)
- enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
-
- rules = format_push_rules_for_user(requester.user, rawrules, enabled_map)
+ rules = format_push_rules_for_user(requester.user, rules)
path = request.postpath[1:]
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index ebb97c8474..786d6f6d67 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -15,6 +15,7 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+from synapse.push.baserules import list_with_base_rules
from twisted.internet import defer
import logging
@@ -23,6 +24,29 @@ import simplejson as json
logger = logging.getLogger(__name__)
+def _load_rules(rawrules, enabled_map):
+ ruleslist = []
+ for rawrule in rawrules:
+ rule = dict(rawrule)
+ rule["conditions"] = json.loads(rawrule["conditions"])
+ rule["actions"] = json.loads(rawrule["actions"])
+ ruleslist.append(rule)
+
+ # We're going to be mutating this a lot, so do a deep copy
+ rules = list(list_with_base_rules(ruleslist))
+
+ for i, rule in enumerate(rules):
+ rule_id = rule['rule_id']
+ if rule_id in enabled_map:
+ if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+ # Rules are cached across users.
+ rule = dict(rule)
+ rule['enabled'] = bool(enabled_map[rule_id])
+ rules[i] = rule
+
+ return rules
+
+
class PushRuleStore(SQLBaseStore):
@cachedInlineCallbacks(lru=True)
def get_push_rules_for_user(self, user_id):
@@ -42,7 +66,11 @@ class PushRuleStore(SQLBaseStore):
key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
)
- defer.returnValue(rows)
+ enabled_map = yield self.get_push_rules_enabled_for_user(user_id)
+
+ rules = _load_rules(rows, enabled_map)
+
+ defer.returnValue(rules)
@cachedInlineCallbacks(lru=True)
def get_push_rules_enabled_for_user(self, user_id):
@@ -85,6 +113,14 @@ class PushRuleStore(SQLBaseStore):
for row in rows:
results.setdefault(row['user_name'], []).append(row)
+
+ enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
+
+ for user_id, rules in results.items():
+ results[user_id] = _load_rules(
+ rules, enabled_map_by_user.get(user_id, {})
+ )
+
defer.returnValue(results)
@cachedList(cached_method_name="get_push_rules_enabled_for_user",
--
cgit 1.5.1
From 597013caa5e22c7134b6ca6e398659ba76047b15 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 1 Jun 2016 18:01:22 +0100
Subject: Make cachedList go a bit faster
---
synapse/metrics/metric.py | 22 ++++++++++---------
synapse/util/caches/descriptors.py | 44 +++++++++++++++++++++++++++++---------
2 files changed, 46 insertions(+), 20 deletions(-)
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index 368fc24984..6f82b360bc 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -15,6 +15,7 @@
from itertools import chain
+from collections import Counter
# TODO(paul): I can't believe Python doesn't have one of these
@@ -55,30 +56,29 @@ class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
integer that counts events."""
+ __slots__ = ("counts")
+
def __init__(self, *args, **kwargs):
super(CounterMetric, self).__init__(*args, **kwargs)
- self.counts = {}
+ self.counts = Counter()
# Scalar metrics are never empty
if self.is_scalar():
self.counts[()] = 0
def inc_by(self, incr, *values):
- if len(values) != self.dimension():
- raise ValueError(
- "Expected as many values to inc() as labels (%d)" % (self.dimension())
- )
+ # if len(values) != self.dimension():
+ # raise ValueError(
+ # "Expected as many values to inc() as labels (%d)" % (self.dimension())
+ # )
# TODO: should assert that the tag values are all strings
- if values not in self.counts:
- self.counts[values] = incr
- else:
- self.counts[values] += incr
+ self.counts[values] += incr
def inc(self, *values):
- self.inc_by(1, *values)
+ self.counts[values] += 1
def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
@@ -132,6 +132,8 @@ class CacheMetric(object):
This metric generates standard metric name pairs, so that monitoring rules
can easily be applied to measure hit ratio."""
+ __slots__ = ("name", "hits", "total", "size")
+
def __init__(self, name, size_callback, labels=[]):
self.name = name
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 758f5982b0..4bbb16ed3c 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -32,6 +32,7 @@ import os
import functools
import inspect
import threading
+import itertools
logger = logging.getLogger(__name__)
@@ -43,6 +44,14 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
class Cache(object):
+ __slots__ = (
+ "cache",
+ "max_entries",
+ "name",
+ "keylen",
+ "sequence",
+ "thread",
+ )
def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
if lru:
@@ -293,16 +302,21 @@ class CacheListDescriptor(object):
# cached is a dict arg -> deferred, where deferred results in a
# 2-tuple (`arg`, `result`)
- cached = {}
+ results = {}
+ cached_defers = {}
missing = []
for arg in list_args:
key = list(keyargs)
key[self.list_pos] = arg
try:
- res = cache.get(tuple(key)).observe()
- res.addCallback(lambda r, arg: (arg, r), arg)
- cached[arg] = res
+ res = cache.get(tuple(key))
+ if not res.called:
+ res = res.observe()
+ res.addCallback(lambda r, arg: (arg, r), arg)
+ cached_defers[arg] = res
+ else:
+ results[arg] = res.result
except KeyError:
missing.append(arg)
@@ -340,12 +354,22 @@ class CacheListDescriptor(object):
res = observer.observe()
res.addCallback(lambda r, arg: (arg, r), arg)
- cached[arg] = res
-
- return preserve_context_over_deferred(defer.gatherResults(
- cached.values(),
- consumeErrors=True,
- ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res)))
+ cached_defers[arg] = res
+
+ if cached_defers:
+ return preserve_context_over_deferred(defer.gatherResults(
+ cached_defers.values(),
+ consumeErrors=True,
+ ).addCallback(
+ lambda res: {
+ k: v
+ for k, v in itertools.chain(results.items(), res)
+ }
+ )).addErrback(
+ unwrapFirstError
+ )
+ else:
+ return results
obj.__dict__[self.orig.__name__] = wrapped
--
cgit 1.5.1
From e043ede4a2f18a47b67bf19368600183554824f7 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 2 Jun 2016 11:52:32 +0100
Subject: Small optimisation to CacheListDescriptor
---
synapse/metrics/metric.py | 22 ++++++++++------------
synapse/util/async.py | 9 +++++++++
synapse/util/caches/descriptors.py | 4 ++--
3 files changed, 21 insertions(+), 14 deletions(-)
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index 6f82b360bc..368fc24984 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -15,7 +15,6 @@
from itertools import chain
-from collections import Counter
# TODO(paul): I can't believe Python doesn't have one of these
@@ -56,29 +55,30 @@ class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
integer that counts events."""
- __slots__ = ("counts")
-
def __init__(self, *args, **kwargs):
super(CounterMetric, self).__init__(*args, **kwargs)
- self.counts = Counter()
+ self.counts = {}
# Scalar metrics are never empty
if self.is_scalar():
self.counts[()] = 0
def inc_by(self, incr, *values):
- # if len(values) != self.dimension():
- # raise ValueError(
- # "Expected as many values to inc() as labels (%d)" % (self.dimension())
- # )
+ if len(values) != self.dimension():
+ raise ValueError(
+ "Expected as many values to inc() as labels (%d)" % (self.dimension())
+ )
# TODO: should assert that the tag values are all strings
- self.counts[values] += incr
+ if values not in self.counts:
+ self.counts[values] = incr
+ else:
+ self.counts[values] += incr
def inc(self, *values):
- self.counts[values] += 1
+ self.inc_by(1, *values)
def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
@@ -132,8 +132,6 @@ class CacheMetric(object):
This metric generates standard metric name pairs, so that monitoring rules
can easily be applied to measure hit ratio."""
- __slots__ = ("name", "hits", "total", "size")
-
def __init__(self, name, size_callback, labels=[]):
self.name = name
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 0d6f48e2d8..40be7fe7e3 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -102,6 +102,15 @@ class ObservableDeferred(object):
def observers(self):
return self._observers
+ def has_called(self):
+ return self._result is not None
+
+ def has_succeeded(self):
+ return self._result is not None and self._result[0] is True
+
+ def get_result(self):
+ return self._result[1]
+
def __getattr__(self, name):
return getattr(self._deferred, name)
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 4bbb16ed3c..5be4097279 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -311,12 +311,12 @@ class CacheListDescriptor(object):
try:
res = cache.get(tuple(key))
- if not res.called:
+ if not res.has_succeeded():
res = res.observe()
res.addCallback(lambda r, arg: (arg, r), arg)
cached_defers[arg] = res
else:
- results[arg] = res.result
+ results[arg] = res.get_result()
except KeyError:
missing.append(arg)
--
cgit 1.5.1
From 81cf449daa8b310899014f5564f5fdf10289e79c Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 11:19:27 +0100
Subject: Add methods to events, account data and receipt slaves
Adds the methods needed by /sync to the slaved events,
account data and receipt stores.
---
synapse/replication/slave/storage/account_data.py | 41 ++++++++++++++++++++++-
synapse/replication/slave/storage/events.py | 21 +++++++++---
synapse/replication/slave/storage/receipts.py | 25 +++++++++++++-
3 files changed, 81 insertions(+), 6 deletions(-)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index f59b0eabbc..735c03c7eb 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -15,7 +15,10 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
from synapse.storage.account_data import AccountDataStore
+from synapse.storage.tags import TagsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
class SlavedAccountDataStore(BaseSlavedStore):
@@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id",
)
+ self._account_data_stream_cache = StreamChangeCache(
+ "AccountDataAndTagsChangeCache",
+ self._account_data_id_gen.get_current_token(),
+ )
+
+ get_account_data_for_user = (
+ AccountDataStore.__dict__["get_account_data_for_user"]
+ )
get_global_account_data_by_type_for_users = (
AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
@@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore):
AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
)
+ get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+
+ get_updated_tags = DataStore.get_updated_tags.__func__
+ get_updated_account_data_for_user = (
+ DataStore.get_updated_account_data_for_user.__func__
+ )
+
+ def get_max_account_data_stream_id(self):
+ return self._account_data_id_gen.get_current_token()
+
def stream_positions(self):
result = super(SlavedAccountDataStore, self).stream_positions()
position = self._account_data_id_gen.get_current_token()
@@ -47,15 +68,33 @@ class SlavedAccountDataStore(BaseSlavedStore):
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
- user_id, data_type = row[1:3]
+ position, user_id, data_type = row[:3]
self.get_global_account_data_by_type_for_user.invalidate(
(data_type, user_id,)
)
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
stream = result.get("room_account_data")
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
stream = result.get("tag_account_data")
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_tags_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedAccountDataStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c0d741452d..cbc1ae4190 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.state import StateStore
+from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
import ujson as json
@@ -57,6 +58,9 @@ class SlavedEventStore(BaseSlavedStore):
"EventsRoomStreamChangeCache", min_event_val,
prefilled_cache=event_cache_prefill,
)
+ self._membership_stream_cache = StreamChangeCache(
+ "MembershipStreamChangeCache", events_max,
+ )
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
@@ -87,6 +91,9 @@ class SlavedEventStore(BaseSlavedStore):
_get_state_group_from_group = (
StateStore.__dict__["_get_state_group_from_group"]
)
+ get_recent_event_ids_for_room = (
+ StreamStore.__dict__["get_recent_event_ids_for_room"]
+ )
get_unread_push_actions_for_user_in_range = (
DataStore.get_unread_push_actions_for_user_in_range.__func__
@@ -109,10 +116,16 @@ class SlavedEventStore(BaseSlavedStore):
DataStore.get_room_events_stream_for_room.__func__
)
get_events_around = DataStore.get_events_around.__func__
+ get_state_for_event = DataStore.get_state_for_event.__func__
get_state_for_events = DataStore.get_state_for_events.__func__
get_state_groups = DataStore.get_state_groups.__func__
+ get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
+ get_room_events_stream_for_rooms = (
+ DataStore.get_room_events_stream_for_rooms.__func__
+ )
+ get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
- _set_before_and_after = DataStore._set_before_and_after
+ _set_before_and_after = staticmethod(DataStore._set_before_and_after)
_get_events = DataStore._get_events.__func__
_get_events_from_cache = DataStore._get_events_from_cache.__func__
@@ -220,9 +233,9 @@ class SlavedEventStore(BaseSlavedStore):
self.get_rooms_for_user.invalidate((event.state_key,))
# self.get_joined_hosts_for_room.invalidate((event.room_id,))
self.get_users_in_room.invalidate((event.room_id,))
- # self._membership_stream_cache.entity_has_changed(
- # event.state_key, event.internal_metadata.stream_ordering
- # )
+ self._membership_stream_cache.entity_has_changed(
+ event.state_key, event.internal_metadata.stream_ordering
+ )
self.get_invited_rooms_for_user.invalidate((event.state_key,))
if not event.is_state():
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index ec007516d0..ac9662d399 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.storage.receipts import ReceiptsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
@@ -37,11 +38,28 @@ class SlavedReceiptsStore(BaseSlavedStore):
db_conn, "receipts_linearized", "stream_id"
)
+ self._receipts_stream_cache = StreamChangeCache(
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
+ )
+
get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
+ get_linearized_receipts_for_room = (
+ ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
+ )
+ _get_linearized_receipts_for_rooms = (
+ ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
+ )
+ get_last_receipt_event_id_for_user = (
+ ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
+ )
get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+ get_linearized_receipts_for_rooms = (
+ DataStore.get_linearized_receipts_for_rooms.__func__
+ )
+
def stream_positions(self):
result = super(SlavedReceiptsStore, self).stream_positions()
result["receipts"] = self._receipts_id_gen.get_current_token()
@@ -52,10 +70,15 @@ class SlavedReceiptsStore(BaseSlavedStore):
if stream:
self._receipts_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
- room_id, receipt_type, user_id = row[1:4]
+ position, room_id, receipt_type, user_id = row[:4]
self.invalidate_caches_for_receipt(room_id, receipt_type, user_id)
+ self._receipts_stream_cache.entity_has_changed(room_id, position)
return super(SlavedReceiptsStore, self).process_replication(result)
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
+ self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+ self.get_last_receipt_event_id_for_user.invalidate(
+ (user_id, room_id, receipt_type)
+ )
--
cgit 1.5.1
From ccb56fc24bd7d2675dc21796b29333538ca0d5fa Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 2 Jun 2016 12:47:06 +0100
Subject: Make get_joined_hosts_for_room use get_users_in_room
---
synapse/storage/roommember.py | 19 +++----------------
1 file changed, 3 insertions(+), 16 deletions(-)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 41b395e07c..64b4bd371b 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -238,23 +238,10 @@ class RoomMemberStore(SQLBaseStore):
return results
- @cached(max_entries=5000)
+ @cachedInlineCallbacks(max_entries=5000)
def get_joined_hosts_for_room(self, room_id):
- return self.runInteraction(
- "get_joined_hosts_for_room",
- self._get_joined_hosts_for_room_txn,
- room_id,
- )
-
- def _get_joined_hosts_for_room_txn(self, txn, room_id):
- rows = self._get_members_rows_txn(
- txn,
- room_id, membership=Membership.JOIN
- )
-
- joined_domains = set(get_domain_from_id(r["user_id"]) for r in rows)
-
- return joined_domains
+ user_ids = yield self.get_users_in_room(room_id)
+ defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids))
def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
rows = self._get_members_rows_txn(
--
cgit 1.5.1
From 4c04222fa55d35ad3a75c5538ec477046b6c5b30 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 2 Jun 2016 13:02:33 +0100
Subject: Poke notifier on next reactor tick
---
synapse/handlers/message.py | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c41dafdef5..15caf1950a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -26,9 +26,9 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
)
from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async import concurrently_execute, run_on_reactor
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import preserve_fn
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -908,13 +908,16 @@ class MessageHandler(BaseHandler):
"Failed to get destination from event %s", s.event_id
)
- with PreserveLoggingContext():
- # Don't block waiting on waking up all the listeners.
+ @defer.inlineCallbacks
+ def _notify():
+ yield run_on_reactor()
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
+ preserve_fn(_notify)()
+
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
--
cgit 1.5.1
From 73c711243382a48b9b67fddf5ed9df2d1ee1be43 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 2 Jun 2016 11:29:44 +0100
Subject: Change CacheMetrics to be quicker
We change it so that each cache has an individual CacheMetric, instead
of having one global CacheMetric. This means that when a cache tries to
increment a counter it does not need to go through so many indirections.
---
synapse/metrics/__init__.py | 16 ++++-------
synapse/metrics/metric.py | 44 +++++++++++++++---------------
synapse/util/caches/__init__.py | 20 ++++++++++----
synapse/util/caches/descriptors.py | 17 +++++++++---
synapse/util/caches/dictionary_cache.py | 8 +++---
synapse/util/caches/expiringcache.py | 8 +++---
synapse/util/caches/stream_change_cache.py | 16 +++++------
tests/metrics/test_metric.py | 23 +++++++---------
8 files changed, 82 insertions(+), 70 deletions(-)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 5664d5a381..c38f24485a 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -33,11 +33,7 @@ from .metric import (
logger = logging.getLogger(__name__)
-# We'll keep all the available metrics in a single toplevel dict, one shared
-# for the entire process. We don't currently support per-HomeServer instances
-# of metrics, because in practice any one python VM will host only one
-# HomeServer anyway. This makes a lot of implementation neater
-all_metrics = {}
+all_metrics = []
class Metrics(object):
@@ -53,7 +49,7 @@ class Metrics(object):
metric = metric_class(full_name, *args, **kwargs)
- all_metrics[full_name] = metric
+ all_metrics.append(metric)
return metric
def register_counter(self, *args, **kwargs):
@@ -84,12 +80,12 @@ def render_all():
# TODO(paul): Internal hack
update_resource_metrics()
- for name in sorted(all_metrics.keys()):
+ for metric in all_metrics:
try:
- strs += all_metrics[name].render()
+ strs += metric.render()
except Exception:
- strs += ["# FAILED to render %s" % name]
- logger.exception("Failed to render %s metric", name)
+ strs += ["# FAILED to render"]
+ logger.exception("Failed to render metric")
strs.append("") # to generate a final CRLF
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index 368fc24984..341043952a 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -47,9 +47,6 @@ class BaseMetric(object):
for k, v in zip(self.labels, values)])
)
- def render(self):
- return map_concat(self.render_item, sorted(self.counts.keys()))
-
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
@@ -83,6 +80,9 @@ class CounterMetric(BaseMetric):
def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
+ def render(self):
+ return map_concat(self.render_item, sorted(self.counts.keys()))
+
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
@@ -126,30 +126,30 @@ class DistributionMetric(object):
class CacheMetric(object):
- """A combination of two CounterMetrics, one to count cache hits and one to
- count a total, and a callback metric to yield the current size.
+ __slots__ = ("name", "cache_name", "hits", "misses", "size_callback")
- This metric generates standard metric name pairs, so that monitoring rules
- can easily be applied to measure hit ratio."""
-
- def __init__(self, name, size_callback, labels=[]):
+ def __init__(self, name, size_callback, cache_name):
self.name = name
+ self.cache_name = cache_name
- self.hits = CounterMetric(name + ":hits", labels=labels)
- self.total = CounterMetric(name + ":total", labels=labels)
+ self.hits = 0
+ self.misses = 0
- self.size = CallbackMetric(
- name + ":size",
- callback=size_callback,
- labels=labels,
- )
+ self.size_callback = size_callback
- def inc_hits(self, *values):
- self.hits.inc(*values)
- self.total.inc(*values)
+ def inc_hits(self):
+ self.hits += 1
- def inc_misses(self, *values):
- self.total.inc(*values)
+ def inc_misses(self):
+ self.misses += 1
def render(self):
- return self.hits.render() + self.total.render() + self.size.render()
+ size = self.size_callback()
+ hits = self.hits
+ total = self.misses + self.hits
+
+ return [
+ """%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
+ """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
+ """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
+ ]
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index d53569ca49..ebd715c5dc 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -24,11 +24,21 @@ DEBUG_CACHES = False
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}
-cache_counter = metrics.register_cache(
- "cache",
- lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
- labels=["name"],
-)
+# cache_counter = metrics.register_cache(
+# "cache",
+# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
+# labels=["name"],
+# )
+
+
+def register_cache(name, cache):
+ caches_by_name[name] = cache
+ return metrics.register_cache(
+ "cache",
+ lambda: len(cache),
+ name,
+ )
+
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
caches_by_name["string_cache"] = _string_cache
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 758f5982b0..5d25c9e762 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -22,7 +22,7 @@ from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
)
-from . import caches_by_name, DEBUG_CACHES, cache_counter
+from . import DEBUG_CACHES, register_cache
from twisted.internet import defer
@@ -43,6 +43,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
class Cache(object):
+ __slots__ = (
+ "cache",
+ "max_entries",
+ "name",
+ "keylen",
+ "sequence",
+ "thread",
+ "metrics",
+ )
def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
if lru:
@@ -59,7 +68,7 @@ class Cache(object):
self.keylen = keylen
self.sequence = 0
self.thread = None
- caches_by_name[name] = self.cache
+ self.metrics = register_cache(name, self.cache)
def check_thread(self):
expected_thread = self.thread
@@ -74,10 +83,10 @@ class Cache(object):
def get(self, key, default=_CacheSentinel):
val = self.cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
return val
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
if default is _CacheSentinel:
raise KeyError()
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index f92d80542b..b0ca1bb79d 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -15,7 +15,7 @@
from synapse.util.caches.lrucache import LruCache
from collections import namedtuple
-from . import caches_by_name, cache_counter
+from . import register_cache
import threading
import logging
@@ -43,7 +43,7 @@ class DictionaryCache(object):
__slots__ = []
self.sentinel = Sentinel()
- caches_by_name[name] = self.cache
+ self.metrics = register_cache(name, self.cache)
def check_thread(self):
expected_thread = self.thread
@@ -58,7 +58,7 @@ class DictionaryCache(object):
def get(self, key, dict_keys=None):
entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel:
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
if dict_keys is None:
return DictionaryEntry(entry.full, dict(entry.value))
@@ -69,7 +69,7 @@ class DictionaryCache(object):
if k in entry.value
})
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return DictionaryEntry(False, {})
def invalidate(self, key):
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 2b68c1ac93..080388958f 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches import register_cache
import logging
@@ -49,7 +49,7 @@ class ExpiringCache(object):
self._cache = {}
- caches_by_name[cache_name] = self._cache
+ self.metrics = register_cache(cache_name, self._cache)
def start(self):
if not self._expiry_ms:
@@ -78,9 +78,9 @@ class ExpiringCache(object):
def __getitem__(self, key):
try:
entry = self._cache[key]
- cache_counter.inc_hits(self._cache_name)
+ self.metrics.inc_hits()
except KeyError:
- cache_counter.inc_misses(self._cache_name)
+ self.metrics.inc_misses()
raise
if self._reset_expiry_on_get:
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index ea8a74ca69..3c051dabc4 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches import register_cache
from blist import sorteddict
@@ -42,7 +42,7 @@ class StreamChangeCache(object):
self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
- caches_by_name[self.name] = self._cache
+ self.metrics = register_cache(self.name, self._cache)
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
@@ -53,19 +53,19 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos < self._earliest_known_stream_pos:
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return True
latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None:
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
return False
if stream_pos < latest_entity_change_pos:
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return True
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
return False
def get_entities_changed(self, entities, stream_pos):
@@ -82,10 +82,10 @@ class StreamChangeCache(object):
self._cache[k] for k in keys[i:]
).intersection(entities)
- cache_counter.inc_hits(self.name)
+ self.metrics.inc_hits()
else:
result = entities
- cache_counter.inc_misses(self.name)
+ self.metrics.inc_misses()
return result
diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py
index f3c1927ce1..f85455a5af 100644
--- a/tests/metrics/test_metric.py
+++ b/tests/metrics/test_metric.py
@@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase):
'vector{method="PUT"} 1',
])
- # Check that passing too few values errors
- self.assertRaises(ValueError, counter.inc)
-
class CallbackMetricTestCase(unittest.TestCase):
@@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase):
def test_cache(self):
d = dict()
- metric = CacheMetric("cache", lambda: len(d))
+ metric = CacheMetric("cache", lambda: len(d), "cache_name")
self.assertEquals(metric.render(), [
- 'cache:hits 0',
- 'cache:total 0',
- 'cache:size 0',
+ 'cache:hits{name="cache_name"} 0',
+ 'cache:total{name="cache_name"} 0',
+ 'cache:size{name="cache_name"} 0',
])
metric.inc_misses()
d["key"] = "value"
self.assertEquals(metric.render(), [
- 'cache:hits 0',
- 'cache:total 1',
- 'cache:size 1',
+ 'cache:hits{name="cache_name"} 0',
+ 'cache:total{name="cache_name"} 1',
+ 'cache:size{name="cache_name"} 1',
])
metric.inc_hits()
self.assertEquals(metric.render(), [
- 'cache:hits 1',
- 'cache:total 2',
- 'cache:size 1',
+ 'cache:hits{name="cache_name"} 1',
+ 'cache:total{name="cache_name"} 2',
+ 'cache:size{name="cache_name"} 1',
])
--
cgit 1.5.1
From 58a224a6515dceacebc729f1e6fbb87a22f3a35a Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 11:47:07 +0100
Subject: Pull out update_results_dict
---
synapse/util/caches/descriptors.py | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 5be4097279..799fd2a9c6 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -32,7 +32,7 @@ import os
import functools
import inspect
import threading
-import itertools
+
logger = logging.getLogger(__name__)
@@ -357,17 +357,16 @@ class CacheListDescriptor(object):
cached_defers[arg] = res
if cached_defers:
+ def update_results_dict(res):
+ results.update(res)
+ return results
+
return preserve_context_over_deferred(defer.gatherResults(
cached_defers.values(),
consumeErrors=True,
- ).addCallback(
- lambda res: {
- k: v
- for k, v in itertools.chain(results.items(), res)
- }
- )).addErrback(
+ ).addCallback(update_results_dict).addErrback(
unwrapFirstError
- )
+ ))
else:
return results
--
cgit 1.5.1
From abb151f3c9bf78f2825dba18da6bbc88ce61d32c Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 11:57:26 +0100
Subject: Add a separate process that can handle /sync requests
---
synapse/app/synchrotron.py | 467 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 467 insertions(+)
create mode 100644 synapse/app/synchrotron.py
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
new file mode 100644
index 0000000000..f592ad352e
--- /dev/null
+++ b/synapse/app/synchrotron.py
@@ -0,0 +1,467 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.api.constants import EventTypes
+from synapse.config._base import ConfigError
+from synapse.config.database import DatabaseConfig
+from synapse.config.logger import LoggingConfig
+from synapse.config.appservice import AppServiceConfig
+from synapse.events import FrozenEvent
+from synapse.handlers.presence import PresenceHandler
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.rest.client.v2_alpha import sync
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
+from synapse.storage.roommember import RoomMemberStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.stringutils import random_string
+from synapse.util.versionstring import get_version_string
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import contextlib
+import ujson as json
+
+logger = logging.getLogger("synapse.app.synchrotron")
+
+
+class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
+ def read_config(self, config):
+ self.replication_url = config["replication_url"]
+ self.server_name = config["server_name"]
+ self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
+ "use_insecure_ssl_client_just_for_testing_do_not_use", False
+ )
+ self.user_agent_suffix = None
+ self.listeners = config["listeners"]
+ self.soft_file_limit = config.get("soft_file_limit")
+ self.daemonize = config.get("daemonize")
+ self.pid_file = self.abspath(config.get("pid_file"))
+ self.macaroon_secret_key = config["macaroon_secret_key"]
+ self.expire_access_token = config.get("expire_access_token", False)
+
+ def default_config(self, server_name, **kwargs):
+ pid_file = self.abspath("synchroton.pid")
+ return """\
+ # Slave configuration
+
+ # The replication listener on the synapse to talk to.
+ #replication_url: https://localhost:{replication_port}/_synapse/replication
+
+ server_name: "%(server_name)s"
+
+ listeners:
+ # Enable a /sync listener on the synchrontron
+ #- type: http
+ # port: {http_port}
+ # bind_address: ""
+ # Enable a ssh manhole listener on the synchrotron
+ # - type: manhole
+ # port: {manhole_port}
+ # bind_address: 127.0.0.1
+ # Enable a metric listener on the synchrotron
+ # - type: http
+ # port: {metrics_port}
+ # bind_address: 127.0.0.1
+ # resources:
+ # - names: ["metrics"]
+ # compress: False
+
+ report_stats: False
+
+ daemonize: False
+
+ pid_file: %(pid_file)s
+ """ % locals()
+
+
+class SynchrotronSlavedStore(
+ SlavedPushRuleStore,
+ SlavedEventStore,
+ SlavedReceiptsStore,
+ SlavedAccountDataStore,
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ SlavedFilteringStore,
+ SlavedPresenceStore,
+):
+ def get_presence_list_accepted(self, user_localpart):
+ return ()
+
+ def insert_client_ip(self, user, access_token, ip, user_agent):
+ pass
+
+ # XXX: This is a bit broken because we don't persist forgotten rooms
+ # in a way that they can be streamed. This means that we don't have a
+ # way to invalidate the forgotten rooms cache correctly.
+ # For now we expire the cache every 10 minutes.
+ BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+ who_forgot_in_room = (
+ RoomMemberStore.__dict__["who_forgot_in_room"]
+ )
+
+
+class SynchrotronPresence(object):
+ def __init__(self, hs):
+ self.http_client = hs.get_simple_http_client()
+ self.store = hs.get_datastore()
+ self.user_to_num_current_syncs = {}
+ self.syncing_users_url = hs.config.replication_url + "/syncing_users"
+ self.clock = hs.get_clock()
+
+ active_presence = self.store.take_presence_startup_info()
+ self.user_to_current_state = {
+ state.user_id: state
+ for state in active_presence
+ }
+
+ self.process_id = random_string(16)
+ logger.info("Presence process_id is %r", self.process_id)
+
+ def set_state(self, user, state):
+ # TODO Hows this supposed to work?
+ pass
+
+ get_states = PresenceHandler.get_states.__func__
+ current_state_for_users = PresenceHandler.current_state_for_users.__func__
+
+ @defer.inlineCallbacks
+ def user_syncing(self, user_id, affect_presence):
+ if affect_presence:
+ curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+ self.user_to_num_current_syncs[user_id] = curr_sync + 1
+ # TODO: Send this less frequently.
+ # TODO: Make sure this doesn't race. Currently we can lose updates
+ # if two users come online in quick sucession and the second http
+ # to the master completes before the first.
+ # TODO: Don't block the sync request on this HTTP hit.
+ yield self._send_syncing_users()
+
+ def _end():
+ if affect_presence:
+ self.user_to_num_current_syncs[user_id] -= 1
+
+ @contextlib.contextmanager
+ def _user_syncing():
+ try:
+ yield
+ finally:
+ _end()
+
+ defer.returnValue(_user_syncing())
+
+ def _send_syncing_users(self):
+ return self.http_client.post_json_get_json(self.syncing_users_url, {
+ "process_id": self.process_id,
+ "syncing_users": [
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count > 0
+ ],
+ })
+
+ def process_replication(self, result):
+ stream = result.get("presence", {"rows": []})
+ for row in stream["rows"]:
+ (
+ position, user_id, state, last_active_ts,
+ last_federation_update_ts, last_user_sync_ts, status_msg,
+ currently_active
+ ) = row
+ self.user_to_current_state[user_id] = UserPresenceState(
+ user_id, state, last_active_ts,
+ last_federation_update_ts, last_user_sync_ts, status_msg,
+ currently_active
+ )
+
+
+class SynchrotronTyping(object):
+ def __init__(self, hs):
+ self._latest_room_serial = 0
+ self._room_serials = {}
+ self._room_typing = {}
+
+ def stream_positions(self):
+ return {"typing": self._latest_room_serial}
+
+ def process_replication(self, result):
+ stream = result.get("typing")
+ if stream:
+ self._latest_room_serial = int(stream["position"])
+
+ for row in stream["rows"]:
+ position, room_id, typing_json = row
+ typing = json.loads(typing_json)
+ self._room_serials[room_id] = position
+ self._room_typing[room_id] = typing
+
+
+class SynchrotronApplicationService(object):
+ def notify_interested_services(self, event):
+ pass
+
+
+class SynchrotronServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ sync.register_servlets(self, resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse synchrotron now listening on port %d", port)
+
+ def start_listening(self):
+ for listener in self.config.listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.replication_url
+ clock = self.get_clock()
+ notifier = self.get_notifier()
+ presence_handler = self.get_presence_handler()
+ typing_handler = self.get_typing_handler()
+
+ def expire_broken_caches():
+ store.who_forgot_in_room.invalidate_all()
+
+ def notify_from_stream(
+ result, stream_name, stream_key, room=None, user=None
+ ):
+ stream = result.get(stream_name)
+ if stream:
+ position_index = stream["field_names"].index("position")
+ if room:
+ room_index = stream["field_names"].index(room)
+ if user:
+ user_index = stream["field_names"].index(user)
+
+ users = ()
+ rooms = ()
+ for row in stream["rows"]:
+ position = row[position_index]
+
+ if user:
+ users = (row[user_index],)
+
+ if room:
+ rooms = (row[room_index],)
+
+ notifier.on_new_event(
+ stream_key, position, users=users, rooms=rooms
+ )
+
+ def notify(result):
+ stream = result.get("events")
+ if stream:
+ max_position = stream["position"]
+ for row in stream["rows"]:
+ position = row[0]
+ internal = json.loads(row[1])
+ event_json = json.loads(row[2])
+ event = FrozenEvent(event_json, internal_metadata_dict=internal)
+ extra_users = ()
+ if event.type == EventTypes.Member:
+ extra_users = (event.state_key,)
+ notifier.on_new_room_event(
+ event, position, max_position, extra_users
+ )
+
+ notify_from_stream(
+ result, "push_rules", "push_rules_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "user_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "room_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "tag_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "receipts", "receipt_key", room="room_id"
+ )
+ notify_from_stream(
+ result, "typing", "typing_key", room="room_id"
+ )
+
+ next_expire_broken_caches_ms = 0
+ while True:
+ try:
+ args = store.stream_positions()
+ args.update(typing_handler.stream_positions())
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ now_ms = clock.time_msec()
+ if now_ms > next_expire_broken_caches_ms:
+ expire_broken_caches()
+ next_expire_broken_caches_ms = (
+ now_ms + store.BROKEN_CACHE_EXPIRY_MS
+ )
+ yield store.process_replication(result)
+ typing_handler.process_replication(result)
+ presence_handler.process_replication(result)
+ notify(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ sleep(5)
+
+ def build_presence_handler(self):
+ return SynchrotronPresence(self)
+
+ def build_typing_handler(self):
+ return SynchrotronTyping(self)
+
+
+def setup(config_options):
+ try:
+ config = SynchrotronConfig.load_config(
+ "Synapse synchrotron", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ if not config:
+ sys.exit(0)
+
+ config.setup_logging()
+
+ database_engine = create_engine(config.database_config)
+
+ ss = SynchrotronServer(
+ config.server_name,
+ db_config=config.database_config,
+ config=config,
+ version_string=get_version_string("Synapse", synapse),
+ database_engine=database_engine,
+ application_service_handler=SynchrotronApplicationService(),
+ )
+
+ ss.setup()
+ ss.start_listening()
+
+ change_resource_limit(ss.config.soft_file_limit)
+
+ def start():
+ ss.get_datastore().start_profiling()
+ ss.replicate()
+
+ reactor.callWhenRunning(start)
+
+ return ss
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ ps = setup(sys.argv[1:])
+
+ if ps.config.daemonize:
+ def run():
+ with LoggingContext("run"):
+ change_resource_limit(ps.config.soft_file_limit)
+ reactor.run()
+
+ daemon = Daemonize(
+ app="synapse-pusher",
+ pid=ps.config.pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+
+ daemon.start()
+ else:
+ reactor.run()
--
cgit 1.5.1
From a7ff5a17702812ae586228396d534a8ed3d88475 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 13:40:55 +0100
Subject: Presence metrics. Change def of small delta
---
synapse/handlers/presence.py | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fc8538b41e..eb877763ee 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -50,6 +50,9 @@ timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
+full_update_presence_counter = metrics.register_counter("full_update_presence")
+partial_update_presence_counter = metrics.register_counter("partial_update_presence")
+
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
@@ -974,13 +977,13 @@ class PresenceEventSource(object):
user_ids_changed = set()
changed = None
- if from_key and max_token - from_key < 100:
- # For small deltas, its quicker to get all changes and then
- # work out if we share a room or they're in our presence list
+ if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
- # get_all_entities_changed can return None
- if changed is not None:
+ if changed is not None and len(changed) < 100:
+ # For small deltas, its quicker to get all changes and then
+ # work out if we share a room or they're in our presence list
+ partial_update_presence_counter.inc()
for other_user_id in changed:
if other_user_id in friends:
user_ids_changed.add(other_user_id)
@@ -992,6 +995,8 @@ class PresenceEventSource(object):
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
+ full_update_presence_counter.inc()
+
user_ids_to_check = set()
for room_id in room_ids:
users = yield self.store.get_users_in_room(room_id)
--
cgit 1.5.1
From 4ce84a1acd89a7f61896e92605e5463864848122 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 13:49:16 +0100
Subject: Change metric style
---
synapse/handlers/presence.py | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index eb877763ee..0e19f777b8 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -50,8 +50,7 @@ timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
-full_update_presence_counter = metrics.register_counter("full_update_presence")
-partial_update_presence_counter = metrics.register_counter("partial_update_presence")
+get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
@@ -980,10 +979,10 @@ class PresenceEventSource(object):
if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
- if changed is not None and len(changed) < 100:
+ if changed is not None and len(changed) < 500:
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
- partial_update_presence_counter.inc()
+ get_updates_counter.inc("stream")
for other_user_id in changed:
if other_user_id in friends:
user_ids_changed.add(other_user_id)
@@ -995,7 +994,7 @@ class PresenceEventSource(object):
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
- full_update_presence_counter.inc()
+ get_updates_counter.inc("full")
user_ids_to_check = set()
for room_id in room_ids:
--
cgit 1.5.1
From ab116bdb0c2d8e295b1473af84c453d212dc07ea Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 14:03:42 +0100
Subject: Fix typo
---
synapse/handlers/typing.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3c54307bed..861b8f7989 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -140,7 +140,7 @@ class TypingHandler(object):
def user_left_room(self, user, room_id):
user_id = user.to_string()
if self.is_mine_id(user_id):
- member = RoomMember(room_id=room_id, user=user_id)
+ member = RoomMember(room_id=room_id, user_id=user_id)
yield self._stopped_typing(member)
@defer.inlineCallbacks
--
cgit 1.5.1
From 80aade380545a0b661e2bbef48e175900ed4d41f Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 14:24:19 +0100
Subject: Send updates to the syncing users every ten seconds or immediately if
they've just come online
---
synapse/app/synchrotron.py | 53 +++++++++++++++++++++++++++++++++++++---------
1 file changed, 43 insertions(+), 10 deletions(-)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index f592ad352e..7b45c87a96 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,7 +16,7 @@
import synapse
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, PresenceState
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
@@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
@@ -135,6 +135,8 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["who_forgot_in_room"]
)
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
class SynchrotronPresence(object):
def __init__(self, hs):
@@ -153,6 +155,13 @@ class SynchrotronPresence(object):
self.process_id = random_string(16)
logger.info("Presence process_id is %r", self.process_id)
+ self._sending_sync = False
+ self._need_to_send_sync = False
+ self.clock.looping_call(
+ self._send_syncing_users_regularly,
+ UPDATE_SYNCING_USERS_MS,
+ )
+
def set_state(self, user, state):
# TODO Hows this supposed to work?
pass
@@ -165,12 +174,10 @@ class SynchrotronPresence(object):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
- # TODO: Send this less frequently.
- # TODO: Make sure this doesn't race. Currently we can lose updates
- # if two users come online in quick sucession and the second http
- # to the master completes before the first.
- # TODO: Don't block the sync request on this HTTP hit.
- yield self._send_syncing_users()
+ prev_states = yield self.current_state_for_users([user_id])
+ if prev_states[user_id].state == PresenceState.OFFLINE:
+ # TODO: Don't block the sync request on this HTTP hit.
+ yield self._send_syncing_users_now()
def _end():
if affect_presence:
@@ -185,8 +192,24 @@ class SynchrotronPresence(object):
defer.returnValue(_user_syncing())
- def _send_syncing_users(self):
- return self.http_client.post_json_get_json(self.syncing_users_url, {
+ def _send_syncing_users_regularly(self):
+ # Only send an update if we aren't in the middle of sending one.
+ if not self._sending_sync:
+ preserve_fn(self._send_syncing_users_now)()
+
+ @defer.inlineCallbacks
+ def _send_syncing_users_now(self):
+ if self._sending_sync:
+ # We don't want to race with sending another update.
+ # Instead we wait for that update to finish and send another
+ # update afterwards.
+ self._need_to_send_sync = True
+ return
+
+ # Flag that we are sending an update.
+ self._sending_sync = True
+
+ yield self.http_client.post_json_get_json(self.syncing_users_url, {
"process_id": self.process_id,
"syncing_users": [
user_id for user_id, count in self.user_to_num_current_syncs.items()
@@ -194,6 +217,16 @@ class SynchrotronPresence(object):
],
})
+ # Unset the flag as we are no longer sending an update.
+ self._sending_sync = False
+ if self._need_to_send_sync:
+ # If something happened while we were sending the update then
+ # we might need to send another update.
+ # TODO: Check if the update that was sent matches the current state
+ # as we only need to send an update if they are different.
+ self._need_to_send_sync = False
+ yield self._send_syncing_users_now()
+
def process_replication(self, result):
stream = result.get("presence", {"rows": []})
for row in stream["rows"]:
--
cgit 1.5.1
From eef541a2919649e6d756d45a29d47fe76cfe02e2 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 14:42:35 +0100
Subject: Move insert_client_ip to a separate class
---
synapse/storage/__init__.py | 48 ++----------------------------
synapse/storage/client_ips.py | 68 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 71 insertions(+), 45 deletions(-)
create mode 100644 synapse/storage/client_ips.py
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 6928a213e8..e93c3de66c 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
-from ._base import Cache, LoggingTransaction
+from ._base import LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@@ -45,6 +45,7 @@ from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
from .openid import OpenIdStore
+from .client_ips import ClientIpStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
@@ -58,12 +59,6 @@ import logging
logger = logging.getLogger(__name__)
-# Number of msec of granularity to store the user IP 'last seen' time. Smaller
-# times give more inserts into the database even for readonly API hits
-# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
-
-
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore,
@@ -84,6 +79,7 @@ class DataStore(RoomMemberStore, RoomStore,
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
+ ClientIpStore,
):
def __init__(self, db_conn, hs):
@@ -91,11 +87,6 @@ class DataStore(RoomMemberStore, RoomStore,
self._clock = hs.get_clock()
self.database_engine = hs.database_engine
- self.client_ip_last_seen = Cache(
- name="client_ip_last_seen",
- keylen=4,
- )
-
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering",
extra_tables=[("local_invites", "stream_id")]
@@ -216,39 +207,6 @@ class DataStore(RoomMemberStore, RoomStore,
return [UserPresenceState(**row) for row in rows]
- @defer.inlineCallbacks
- def insert_client_ip(self, user, access_token, ip, user_agent):
- now = int(self._clock.time_msec())
- key = (user.to_string(), access_token, ip)
-
- try:
- last_seen = self.client_ip_last_seen.get(key)
- except KeyError:
- last_seen = None
-
- # Rate-limited inserts
- if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
- defer.returnValue(None)
-
- self.client_ip_last_seen.prefill(key, now)
-
- # It's safe not to lock here: a) no unique constraint,
- # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
- yield self._simple_upsert(
- "user_ips",
- keyvalues={
- "user_id": user.to_string(),
- "access_token": access_token,
- "ip": ip,
- "user_agent": user_agent,
- },
- values={
- "last_seen": now,
- },
- desc="insert_client_ip",
- lock=False,
- )
-
@defer.inlineCallbacks
def count_daily_users(self):
"""
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
new file mode 100644
index 0000000000..a90990e006
--- /dev/null
+++ b/synapse/storage/client_ips.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore, Cache
+
+from twisted.internet import defer
+
+
+# Number of msec of granularity to store the user IP 'last seen' time. Smaller
+# times give more inserts into the database even for readonly API hits
+# 120 seconds == 2 minutes
+LAST_SEEN_GRANULARITY = 120 * 1000
+
+
+class ClientIpStore(SQLBaseStore):
+
+ def __init__(self, hs):
+ self.client_ip_last_seen = Cache(
+ name="client_ip_last_seen",
+ keylen=4,
+ )
+
+ super(ClientIpStore, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def insert_client_ip(self, user, access_token, ip, user_agent):
+ now = int(self._clock.time_msec())
+ key = (user.to_string(), access_token, ip)
+
+ try:
+ last_seen = self.client_ip_last_seen.get(key)
+ except KeyError:
+ last_seen = None
+
+ # Rate-limited inserts
+ if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+ defer.returnValue(None)
+
+ self.client_ip_last_seen.prefill(key, now)
+
+ # It's safe not to lock here: a) no unique constraint,
+ # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
+ yield self._simple_upsert(
+ "user_ips",
+ keyvalues={
+ "user_id": user.to_string(),
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ },
+ values={
+ "last_seen": now,
+ },
+ desc="insert_client_ip",
+ lock=False,
+ )
--
cgit 1.5.1
From 0b3c80a234cd8f16c8714af7e7b719dc2e635b20 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 14:55:01 +0100
Subject: Use ClientIpStore to record client ips
---
synapse/app/synchrotron.py | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 7b45c87a96..0446a1643d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -27,6 +27,7 @@ from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.rest.client.v2_alpha import sync
+from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -36,6 +37,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
@@ -119,13 +121,12 @@ class SynchrotronSlavedStore(
SlavedRegistrationStore,
SlavedFilteringStore,
SlavedPresenceStore,
+ BaseSlavedStore,
+ ClientIpStore, # After BaseSlavedStre because the constructor is different
):
def get_presence_list_accepted(self, user_localpart):
return ()
- def insert_client_ip(self, user, access_token, ip, user_agent):
- pass
-
# XXX: This is a bit broken because we don't persist forgotten rooms
# in a way that they can be streamed. This means that we don't have a
# way to invalidate the forgotten rooms cache correctly.
--
cgit 1.5.1
From da491e75b2d46c885f7fbb9240501c223e7c59bd Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 14:56:36 +0100
Subject: Appease flake8
---
synapse/app/synchrotron.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 0446a1643d..af06ce70d1 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -122,7 +122,7 @@ class SynchrotronSlavedStore(
SlavedFilteringStore,
SlavedPresenceStore,
BaseSlavedStore,
- ClientIpStore, # After BaseSlavedStre because the constructor is different
+ ClientIpStore, # After BaseSlavedStre because the constructor is different
):
def get_presence_list_accepted(self, user_localpart):
return ()
--
cgit 1.5.1
From 48340e4f13a8090feac070ebb507e7629d03b530 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 15:02:27 +0100
Subject: Clear the list of ongoing syncs on shutdown
---
synapse/app/synchrotron.py | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index af06ce70d1..f4b416f777 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -163,6 +163,8 @@ class SynchrotronPresence(object):
UPDATE_SYNCING_USERS_MS,
)
+ reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+
def set_state(self, user, state):
# TODO Hows this supposed to work?
pass
@@ -193,6 +195,13 @@ class SynchrotronPresence(object):
defer.returnValue(_user_syncing())
+ @defer.inlineCallbacks
+ def _on_shutdown(self):
+ # When the synchrotron is shutdown tell the master to clear the in
+ # progress syncs for this process
+ self.user_to_num_current_syncs.clear()
+ yield self._send_syncing_users_now()
+
def _send_syncing_users_regularly(self):
# Only send an update if we aren't in the middle of sending one.
if not self._sending_sync:
--
cgit 1.5.1
From 21961c93c72c5d99d44bf6a264b641c18ac0219b Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 15:31:56 +0100
Subject: Bump changelog and version
---
CHANGES.rst | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
synapse/__init__.py | 2 +-
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/CHANGES.rst b/CHANGES.rst
index b027fb970c..776681de5f 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,53 @@
+Changes in synapse v0.16.0-rc1 (2016-06-03)
+===========================================
+
+Features:
+
+* Add email notifications for missed messages (PR #759, #786, #799, #810, #815,
+ #821)
+* Add a ``url_preview_ip_range_whitelist`` config param (PR #760)
+* Add /report endpoint (PR #762)
+* Add basic ignore user API (PR #763)
+* Add an openidish mechanism for proving that you own a given user_id (PR #765)
+* Allow clients to specify a server_name to avoid 'No known servers' (PR #794)
+* Add secondary_directory_servers option to fetch room list from other servers
+ (PR #808, #813)
+
+Changes:
+
+* Report per request metrics for all of the things using request_handler (PR
+ #756)
+* Correctly handle ``NULL`` password hashes from the database (PR #775)
+* Allow receipts for events we haven't seen in the db (PR #784)
+* Make synctl read a cache factor from config file (PR #785)
+* Increment badge count per missed convo, not per msg (PR #793)
+* Special case m.room.third_party_invite event auth to match invites (PR #814)
+
+
+Bug fixes:
+
+* Fix typo in event_auth servlet path (PR #757)
+* Fix password reset (PR #758)
+
+
+Performance improvements:
+
+* Reduce database inserts when sending transactions (PR #767)
+* Queue events by room for persistence (PR #768)
+* Add cache to ``get_user_by_id`` (PR #772)
+* Add and use ``get_domain_from_id`` (PR #773)
+* Use tree cache for ``get_linearized_receipts_for_room`` (PR #779)
+* Remove unused indices (PR #782)
+* Add caches to ``bulk_get_push_rules*`` (PR #804)
+* Cache ``get_event_reference_hashes`` (PR #806)
+* Add ``get_users_with_read_receipts_in_room`` cache (PR #809)
+* Use state to calculate ``get_users_in_room`` (PR #811)
+* Load push rules in storage layer so that they get cached (PR #825)
+* Make ``get_joined_hosts_for_room`` use get_users_in_room (PR #828)
+* Poke notifier on next reactor tick (PR #829)
+* Change CacheMetrics to be quicker (PR #830)
+
+
Changes in synapse v0.15.0-rc1 (2016-04-26)
===========================================
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 988318f5ea..3b290db79f 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.15.0-rc1"
+__version__ = "0.16.0-rc1"
--
cgit 1.5.1
From c11614bcdc9acf87388554e11f1c8d911bd85b57 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 15:50:15 +0100
Subject: Note that v0.15.x was never released
---
CHANGES.rst | 2 ++
1 file changed, 2 insertions(+)
diff --git a/CHANGES.rst b/CHANGES.rst
index 776681de5f..e77b31b583 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,6 +1,8 @@
Changes in synapse v0.16.0-rc1 (2016-06-03)
===========================================
+Version 0.15 was not released. See v0.15.0-rc1 below for additional changes.
+
Features:
* Add email notifications for missed messages (PR #759, #786, #799, #810, #815,
--
cgit 1.5.1
From 06d40c8b9841cd877e70e205d55a08f423ff2ec9 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Fri, 3 Jun 2016 16:31:23 +0100
Subject: Add substitutions to email notif From
---
synapse/push/mailer.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 88402e42a6..933a53fc3e 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -186,7 +186,7 @@ class Mailer(object):
multipart_msg = MIMEMultipart('alternative')
multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
- multipart_msg['From'] = self.hs.config.email_notif_from
+ multipart_msg['From'] = self.hs.config.email_notif_from % (self.app_name, )
multipart_msg['To'] = email_address
multipart_msg['Date'] = email.utils.formatdate()
multipart_msg['Message-ID'] = email.utils.make_msgid()
--
cgit 1.5.1
From fbf608decbf85051379dc24446b1b6e89ff97e8c Mon Sep 17 00:00:00 2001
From: David Baker
Date: Fri, 3 Jun 2016 16:38:39 +0100
Subject: Oops, we're using the dict form
---
synapse/push/mailer.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 933a53fc3e..011bc4d2b1 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -186,7 +186,9 @@ class Mailer(object):
multipart_msg = MIMEMultipart('alternative')
multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
- multipart_msg['From'] = self.hs.config.email_notif_from % (self.app_name, )
+ multipart_msg['From'] = self.hs.config.email_notif_from % {
+ "app": self.app_name
+ }
multipart_msg['To'] = email_address
multipart_msg['Date'] = email.utils.formatdate()
multipart_msg['Message-ID'] = email.utils.make_msgid()
--
cgit 1.5.1
From 72c4d482e99d30fe96e2b24389629abe5b572626 Mon Sep 17 00:00:00 2001
From: David Baker
Date: Fri, 3 Jun 2016 16:39:50 +0100
Subject: 3rd time lucky: we'd already calculated it above
---
synapse/push/mailer.py | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 011bc4d2b1..e5c3929cd7 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -186,9 +186,7 @@ class Mailer(object):
multipart_msg = MIMEMultipart('alternative')
multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
- multipart_msg['From'] = self.hs.config.email_notif_from % {
- "app": self.app_name
- }
+ multipart_msg['From'] = from_string
multipart_msg['To'] = email_address
multipart_msg['Date'] = email.utils.formatdate()
multipart_msg['Message-ID'] = email.utils.make_msgid()
--
cgit 1.5.1
From 05e01f21d7012c1853ff566c8a76aa66087bfbd7 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 17:12:48 +0100
Subject: Remove event fetching from DB threads
---
synapse/replication/slave/storage/events.py | 5 -
synapse/storage/appservice.py | 21 +++--
synapse/storage/events.py | 138 ----------------------------
synapse/storage/room.py | 46 ++++++----
synapse/storage/search.py | 29 +++---
synapse/storage/stream.py | 34 +++----
tests/storage/test_appservice.py | 2 +-
7 files changed, 75 insertions(+), 200 deletions(-)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index cbc1ae4190..877c68508c 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -131,15 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
_get_events_from_cache = DataStore._get_events_from_cache.__func__
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
- _parse_events_txn = DataStore._parse_events_txn.__func__
- _get_events_txn = DataStore._get_events_txn.__func__
- _get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
- _fetch_events_txn = DataStore._fetch_events_txn.__func__
_fetch_event_rows = DataStore._fetch_event_rows.__func__
_get_event_from_row = DataStore._get_event_from_row.__func__
- _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index feb9d228ae..ffb7d4a25b 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
dict(txn_id=txn_id, as_id=service.id)
)
+ @defer.inlineCallbacks
def get_oldest_unsent_txn(self, service):
"""Get the oldest transaction which has not been sent for this
service.
@@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
A Deferred which resolves to an AppServiceTransaction or
None.
"""
- return self.runInteraction(
+ entry = yield self.runInteraction(
"get_oldest_unsent_appservice_txn",
self._get_oldest_unsent_txn,
service
)
+ if not entry:
+ defer.returnValue(None)
+
+ event_ids = json.loads(entry["event_ids"])
+
+ events = yield self.get_events(event_ids)
+
+ defer.returnValue(AppServiceTransaction(
+ service=service, id=entry["txn_id"], events=events
+ ))
+
def _get_oldest_unsent_txn(self, txn, service):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
@@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
entry = rows[0]
- event_ids = json.loads(entry["event_ids"])
- events = self._get_events_txn(txn, event_ids)
-
- return AppServiceTransaction(
- service=service, id=entry["txn_id"], events=events
- )
+ return entry
def _get_last_txn(self, txn, service_id):
txn.execute(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2b3f79577b..b710505a7e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -762,41 +762,6 @@ class EventsStore(SQLBaseStore):
if e_id in event_map and event_map[e_id]
])
- def _get_events_txn(self, txn, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- return []
-
- event_map = self._get_events_from_cache(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- missing_events_ids = [e for e in event_ids if e not in event_map]
-
- if not missing_events_ids:
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
-
- missing_events = self._fetch_events_txn(
- txn,
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- event_map.update(missing_events)
-
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
-
def _invalidate_get_event_cache(self, event_id):
for check_redacted in (False, True):
for get_prev_content in (False, True):
@@ -804,18 +769,6 @@ class EventsStore(SQLBaseStore):
(event_id, check_redacted, get_prev_content)
)
- def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
-
- events = self._get_events_txn(
- txn, [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- return events[0] if events else None
-
def _get_events_from_cache(self, events, check_redacted, get_prev_content,
allow_rejected):
event_map = {}
@@ -981,34 +934,6 @@ class EventsStore(SQLBaseStore):
return rows
- def _fetch_events_txn(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not events:
- return {}
-
- rows = self._fetch_event_rows(
- txn, events,
- )
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = [
- self._get_event_from_row_txn(
- txn,
- row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- rejected_reason=row["rejects"],
- )
- for row in rows
- ]
-
- return {
- r.event_id: r
- for r in res
- }
-
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
@@ -1070,69 +995,6 @@ class EventsStore(SQLBaseStore):
defer.returnValue(ev)
- def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
- rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = self._simple_select_one_onecol_txn(
- txn,
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- )
-
- ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- if check_redacted and redacted:
- ev = prune_event(ev)
-
- redaction_id = self._simple_select_one_onecol_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": ev.event_id},
- retcol="event_id",
- )
-
- ev.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = self._get_event_txn(
- txn,
- redaction_id,
- check_redacted=False
- )
-
- if because:
- ev.unsigned["redacted_because"] = because
-
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = self._get_event_txn(
- txn,
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- return ev
-
- def _parse_events_txn(self, txn, rows):
- event_ids = [r["event_id"] for r in rows]
-
- return self._get_events_txn(txn, event_ids)
-
@defer.inlineCallbacks
def count_daily_messages(self):
"""
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 26933e593a..97f9f1929c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
- def f(txn):
+ def get_room_name(txn):
sql = (
- "SELECT event_id FROM current_state_events "
- "WHERE room_id = ? "
+ "SELECT name FROM room_names"
+ " INNER JOIN current_state_events USING (room_id, event_id)"
+ " WHERE room_id = ?"
+ " LIMIT 1"
)
- sql += " AND ((type = 'm.room.name' AND state_key = '')"
- sql += " OR type = 'm.room.aliases')"
-
txn.execute(sql, (room_id,))
- results = self.cursor_to_dict(txn)
+ rows = txn.fetchall()
+ if rows:
+ return rows[0][0]
+ else:
+ return None
- return self._parse_events_txn(txn, results)
+ return [row[0] for row in txn.fetchall()]
- events = yield self.runInteraction("get_room_name_and_aliases", f)
+ def get_room_aliases(txn):
+ sql = (
+ "SELECT content FROM current_state_events"
+ " INNER JOIN events USING (room_id, event_id)"
+ " WHERE room_id = ?"
+ )
+ txn.execute(sql, (room_id,))
+ return [row[0] for row in txn.fetchall()]
+
+ name = yield self.runInteraction("get_room_name", get_room_name)
+ alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
- name = None
aliases = []
- for e in events:
- if e.type == 'm.room.name':
- if 'name' in e.content:
- name = e.content['name']
- elif e.type == 'm.room.aliases':
- if 'aliases' in e.content:
- aliases.extend(e.content['aliases'])
+ for c in alias_contents:
+ try:
+ content = json.loads(c)
+ except:
+ continue
+
+ aliases.extend(content.get('aliases', []))
defer.returnValue((name, aliases))
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 0224299625..12941d1775 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
import re
+import ujson as json
logger = logging.getLogger(__name__)
@@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
- "SELECT stream_ordering, event_id FROM events"
+ "SELECT stream_ordering, event_id, room_id, type, content FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore):
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
- rows = txn.fetchall()
+ rows = self.cursor_to_dict(txn)
if not rows:
return 0
- min_stream_id = rows[-1][0]
- event_ids = [row[1] for row in rows]
-
- events = self._get_events_txn(txn, event_ids)
+ min_stream_id = rows[-1]["stream_ordering"]
event_search_rows = []
- for event in events:
+ for row in rows:
try:
- event_id = event.event_id
- room_id = event.room_id
- content = event.content
- if event.type == "m.room.message":
+ event_id = row["event_id"]
+ room_id = row["room_id"]
+ etype = row["type"]
+ try:
+ content = json.loads(row["content"])
+ except:
+ continue
+
+ if etype == "m.room.message":
key = "content.body"
value = content["body"]
- elif event.type == "m.room.topic":
+ elif etype == "m.room.topic":
key = "content.topic"
value = content["topic"]
- elif event.type == "m.room.name":
+ elif etype == "m.room.name":
key = "content.name"
value = content["name"]
except (KeyError, AttributeError):
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 95b12559a6..b9ad965fd6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore):
return True
return False
- ret = self._get_events_txn(
- txn,
- # apply the filter on the room id list
- [
- r["event_id"] for r in rows
- if app_service_interested(r)
- ],
- get_prev_content=True
- )
+ return [r for r in rows if app_service_interested(r)]
- self._set_before_and_after(ret, rows)
+ rows = yield self.runInteraction("get_appservice_room_stream", f)
- if rows:
- key = "s%d" % max(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = to_key
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
- return ret, key
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
- results = yield self.runInteraction("get_appservice_room_stream", f)
- defer.returnValue(results)
+ if rows:
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ defer.returnValue((ret, key))
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 5734198121..f44c4870e3 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
# we aren't testing store._base stuff here, so mock this out
- self.store._get_events_txn = Mock(return_value=events)
+ self.store.get_events = Mock(return_value=events)
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
yield self._insert_txn(service.id, 10, events)
--
cgit 1.5.1
From 10ea3f46ba3eda2f7c220a5e5902b687feb3042c Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 17:55:32 +0100
Subject: Change the way we cache events
---
synapse/storage/events.py | 80 ++++++++++++++++++++++++-----------------------
1 file changed, 41 insertions(+), 39 deletions(-)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b710505a7e..779743b8f9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -139,6 +139,9 @@ class _EventPeristenceQueue(object):
pass
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
@@ -741,7 +744,6 @@ class EventsStore(SQLBaseStore):
event_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
@@ -751,40 +753,49 @@ class EventsStore(SQLBaseStore):
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
event_map.update(missing_events)
- defer.returnValue([
+ events = [
event_map[e_id] for e_id in event_id_list
if e_id in event_map and event_map[e_id]
- ])
+ ]
+
+ if get_prev_content:
+ for event in events:
+ if "replaces_state" in event.unsigned:
+ prev = yield self.get_event(
+ event.unsigned["replaces_state"],
+ get_prev_content=False,
+ allow_none=True,
+ )
+ if prev:
+ event.unsigned = dict(event.unsigned)
+ event.unsigned["prev_content"] = prev.content
+ event.unsigned["prev_sender"] = prev.sender
+
+ defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
- for check_redacted in (False, True):
- for get_prev_content in (False, True):
- self._get_event_cache.invalidate(
- (event_id, check_redacted, get_prev_content)
- )
+ self._get_event_cache.invalidate((event_id,))
- def _get_events_from_cache(self, events, check_redacted, get_prev_content,
- allow_rejected):
+ def _get_events_from_cache(self, events, check_redacted, allow_rejected):
event_map = {}
for event_id in events:
- try:
- ret = self._get_event_cache.get(
- (event_id, check_redacted, get_prev_content,)
- )
+ ret = self._get_event_cache.get((event_id,), None)
+ if not ret:
+ continue
- if allow_rejected or not ret.rejected_reason:
- event_map[event_id] = ret
+ if allow_rejected or not ret.event.rejected_reason:
+ if check_redacted and ret.redacted_event:
+ event_map[event_id] = ret.redacted_event
else:
- event_map[event_id] = None
- except KeyError:
- pass
+ event_map[event_id] = ret.event
+ else:
+ event_map[event_id] = None
return event_map
@@ -855,8 +866,7 @@ class EventsStore(SQLBaseStore):
reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@@ -895,7 +905,6 @@ class EventsStore(SQLBaseStore):
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
check_redacted=check_redacted,
- get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
@@ -936,8 +945,7 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
- rejected_reason=None):
+ check_redacted=True, rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -949,14 +957,17 @@ class EventsStore(SQLBaseStore):
desc="_get_event_from_row",
)
- ev = FrozenEvent(
+ original_ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
+ ev = original_ev
+ redacted_event = None
if check_redacted and redacted:
ev = prune_event(ev)
+ redacted_event = ev
redaction_id = yield self._simple_select_one_onecol(
table="redactions",
@@ -979,19 +990,10 @@ class EventsStore(SQLBaseStore):
# will serialise this field correctly
ev.unsigned["redacted_because"] = because
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = yield self.get_event(
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
+ self._get_event_cache.prefill((ev.event_id,), _EventCacheEntry(
+ event=original_ev,
+ redacted_event=redacted_event,
+ ))
defer.returnValue(ev)
--
cgit 1.5.1
From 8f79084bd44f76223048c1bd6d836f904edcc95e Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 18:03:40 +0100
Subject: Add get_presence_list_accepted to the broken caches in synchrotron
---
synapse/app/synchrotron.py | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index f4b416f777..c77854fab1 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
-from synapse.storage.presence import UserPresenceState
+from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
@@ -124,9 +124,6 @@ class SynchrotronSlavedStore(
BaseSlavedStore,
ClientIpStore, # After BaseSlavedStre because the constructor is different
):
- def get_presence_list_accepted(self, user_localpart):
- return ()
-
# XXX: This is a bit broken because we don't persist forgotten rooms
# in a way that they can be streamed. This means that we don't have a
# way to invalidate the forgotten rooms cache correctly.
@@ -136,6 +133,13 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["who_forgot_in_room"]
)
+ # XXX: This is a bit broken because we don't persist the accepted list in a
+ # way that can be replicated. This means that we don't have a way to
+ # invalidate the cache correctly.
+ get_presence_list_accepted = PresenceStore.__dict__[
+ "get_presence_list_accepted"
+ ]
+
UPDATE_SYNCING_USERS_MS = 10 * 1000
@@ -357,6 +361,7 @@ class SynchrotronServer(HomeServer):
def expire_broken_caches():
store.who_forgot_in_room.invalidate_all()
+ store.get_presence_list_accepted.invalidate_all()
def notify_from_stream(
result, stream_name, stream_key, room=None, user=None
--
cgit 1.5.1
From ac9716f1546ae486cac435b8a577cc2c54b666d6 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Fri, 3 Jun 2016 18:10:00 +0100
Subject: Fix spelling
---
synapse/app/synchrotron.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index c77854fab1..aa81e1c5da 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -122,7 +122,7 @@ class SynchrotronSlavedStore(
SlavedFilteringStore,
SlavedPresenceStore,
BaseSlavedStore,
- ClientIpStore, # After BaseSlavedStre because the constructor is different
+ ClientIpStore, # After BaseSlavedStore because the constructor is different
):
# XXX: This is a bit broken because we don't persist forgotten rooms
# in a way that they can be streamed. This means that we don't have a
--
cgit 1.5.1
From cffe46408f40db082df76adc263cf5014031ae54 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Fri, 3 Jun 2016 18:25:21 +0100
Subject: Don't rely on options when inserting event into cache
---
synapse/storage/events.py | 83 ++++++++++++++++++++++++-----------------------
1 file changed, 43 insertions(+), 40 deletions(-)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 779743b8f9..5db24e86f9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -741,13 +741,12 @@ class EventsStore(SQLBaseStore):
event_id_list = event_ids
event_ids = set(event_ids)
- event_map = self._get_events_from_cache(
+ event_entry_map = self._get_events_from_cache(
event_ids,
- check_redacted=check_redacted,
allow_rejected=allow_rejected,
)
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
missing_events = yield self._enqueue_events(
@@ -756,32 +755,40 @@ class EventsStore(SQLBaseStore):
allow_rejected=allow_rejected,
)
- event_map.update(missing_events)
+ event_entry_map.update(missing_events)
- events = [
- event_map[e_id] for e_id in event_id_list
- if e_id in event_map and event_map[e_id]
- ]
+ events = []
+ for event_id in event_id_list:
+ entry = event_entry_map.get(event_id, None)
+ if not entry:
+ continue
- if get_prev_content:
- for event in events:
- if "replaces_state" in event.unsigned:
- prev = yield self.get_event(
- event.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- event.unsigned = dict(event.unsigned)
- event.unsigned["prev_content"] = prev.content
- event.unsigned["prev_sender"] = prev.sender
+ if allow_rejected or not entry.event.rejected_reason:
+ if check_redacted and entry.redacted_event:
+ event = entry.redacted_event
+ else:
+ event = entry.event
+
+ events.append(event)
+
+ if get_prev_content:
+ if "replaces_state" in event.unsigned:
+ prev = yield self.get_event(
+ event.unsigned["replaces_state"],
+ get_prev_content=False,
+ allow_none=True,
+ )
+ if prev:
+ event.unsigned = dict(event.unsigned)
+ event.unsigned["prev_content"] = prev.content
+ event.unsigned["prev_sender"] = prev.sender
defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
- def _get_events_from_cache(self, events, check_redacted, allow_rejected):
+ def _get_events_from_cache(self, events, allow_rejected):
event_map = {}
for event_id in events:
@@ -790,10 +797,7 @@ class EventsStore(SQLBaseStore):
continue
if allow_rejected or not ret.event.rejected_reason:
- if check_redacted and ret.redacted_event:
- event_map[event_id] = ret.redacted_event
- else:
- event_map[event_id] = ret.event
+ event_map[event_id] = ret
else:
event_map[event_id] = None
@@ -904,7 +908,6 @@ class EventsStore(SQLBaseStore):
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
rejected_reason=row["rejects"],
)
for row in rows
@@ -913,7 +916,7 @@ class EventsStore(SQLBaseStore):
)
defer.returnValue({
- e.event_id: e
+ e.event.event_id: e
for e in res if e
})
@@ -945,7 +948,7 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- check_redacted=True, rejected_reason=None):
+ rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -954,7 +957,7 @@ class EventsStore(SQLBaseStore):
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_rejected_reason",
)
original_ev = FrozenEvent(
@@ -963,20 +966,18 @@ class EventsStore(SQLBaseStore):
rejected_reason=rejected_reason,
)
- ev = original_ev
redacted_event = None
- if check_redacted and redacted:
- ev = prune_event(ev)
- redacted_event = ev
+ if redacted:
+ redacted_event = prune_event(original_ev)
redaction_id = yield self._simple_select_one_onecol(
table="redactions",
- keyvalues={"redacts": ev.event_id},
+ keyvalues={"redacts": redacted_event.event_id},
retcol="event_id",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_redactions",
)
- ev.unsigned["redacted_by"] = redaction_id
+ redacted_event.unsigned["redacted_by"] = redaction_id
# Get the redaction event.
because = yield self.get_event(
@@ -988,14 +989,16 @@ class EventsStore(SQLBaseStore):
if because:
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
- ev.unsigned["redacted_because"] = because
+ redacted_event.unsigned["redacted_because"] = because
- self._get_event_cache.prefill((ev.event_id,), _EventCacheEntry(
+ cache_entry = _EventCacheEntry(
event=original_ev,
redacted_event=redacted_event,
- ))
+ )
+
+ self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
- defer.returnValue(ev)
+ defer.returnValue(cache_entry)
@defer.inlineCallbacks
def count_daily_messages(self):
--
cgit 1.5.1
From 70aee0717c22acf7eabb5f158cbaf527137bc90e Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Mon, 6 Jun 2016 11:08:12 +0100
Subject: Add events to cache when we persist them
---
synapse/storage/events.py | 41 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 41 insertions(+)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5db24e86f9..16398dc0a8 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -635,6 +635,8 @@ class EventsStore(SQLBaseStore):
],
)
+ self._add_to_cache(txn, events_and_contexts)
+
if backfilled:
# Backfilled events come before the current state so we don't need
# to update the current state table
@@ -676,6 +678,45 @@ class EventsStore(SQLBaseStore):
return
+ def _add_to_cache(self, txn, events_and_contexts):
+ to_prefill = []
+
+ rows = []
+ N = 200
+ for i in range(0, len(events_and_contexts), N):
+ ev_map = {
+ e[0].event_id: e[0]
+ for e in events_and_contexts[i:i + N]
+ }
+ if not ev_map:
+ break
+
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE e.event_id IN (%s)"
+ ) % (",".join(["?"] * len(ev_map)),)
+
+ txn.execute(sql, ev_map.keys())
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(
+ event=event,
+ redacted_event=None,
+ ))
+
+ def prefill():
+ for cache_entry in to_prefill:
+ self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+ txn.call_after(prefill)
+
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
--
cgit 1.5.1
From 7aa778fba9bb81087c3a1029e0a0d4ff55b1a065 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Mon, 6 Jun 2016 11:58:09 +0100
Subject: Add metric counter for number of persisted events
---
synapse/storage/events.py | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5db24e86f9..ff4f742f6a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
from collections import deque, namedtuple
+import synapse
+import synapse.metrics
+
import logging
import math
@@ -35,6 +38,10 @@ import ujson as json
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
def encode_json(json_object):
if USE_FROZEN_DICTS:
# ujson doesn't like frozen_dicts
@@ -261,6 +268,7 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk,
backfilled=backfilled,
)
+ persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
@log_function
@@ -278,6 +286,7 @@ class EventsStore(SQLBaseStore):
current_state=current_state,
backfilled=backfilled,
)
+ persist_event_counter.inc()
except _RollbackButIsFineException:
pass
--
cgit 1.5.1
From 377eb480ca66a376e85cf8927f7f9112ed60e8bc Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Mon, 6 Jun 2016 15:14:21 +0100
Subject: Fire after 30s not 8h
---
synapse/handlers/presence.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 0e19f777b8..2e772da660 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -183,7 +183,7 @@ class PresenceHandler(object):
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
- 30 * 1000,
+ 30,
self.clock.looping_call,
self._handle_timeouts,
5000,
--
cgit 1.5.1
From 96dc600579cd6ef9937b0e007f51aa4da0fc122d Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Mon, 6 Jun 2016 15:44:41 +0100
Subject: Fix typos
---
synapse/handlers/presence.py | 68 +++++++++++++++++++++++---------------------
1 file changed, 36 insertions(+), 32 deletions(-)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2e772da660..94160a5be7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -283,44 +283,48 @@ class PresenceHandler(object):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
+ logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = set(self.wheel_timer.fetch(now))
-
- # Check whether the lists of syncing processes from an external
- # process have expired.
- expired_process_ids = [
- process_id for process_id, last_update
- in self.external_process_last_update.items()
- if now - last_update > EXTERNAL_PROCESS_EXPIRY
- ]
- for process_id in expired_process_ids:
- users_to_check.update(
- self.external_process_to_current_syncs.pop(process_id, ())
- )
- self.external_process_last_update.pop(process_id)
+ try:
+ with Measure(self.clock, "presence_handle_timeouts"):
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in users_to_check
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc_by(len(states))
+ timers_fired_counter.inc_by(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.is_mine_id,
- syncing_users=self.get_syncing_users(),
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- preserve_fn(self._update_states)(changes)
+ preserve_fn(self._update_states)(changes)
+ except:
+ logger.exception("Exception in _handle_timeouts loop")
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
--
cgit 1.5.1
From 216a05b3e39e08b0600a39fc111b4d669d06ff7c Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Mon, 6 Jun 2016 16:00:09 +0100
Subject: .values() returns list of sets
---
synapse/handlers/presence.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 94160a5be7..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -406,7 +406,8 @@ class PresenceHandler(object):
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
- syncing_user_ids.update(self.external_process_to_current_syncs.values())
+ for user_ids in self.external_process_to_current_syncs.values():
+ syncing_user_ids.update(user_ids)
return syncing_user_ids
@defer.inlineCallbacks
--
cgit 1.5.1
From 5ef84da4f11f1b1cceb0c44d9867bb597ee68e64 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Mon, 6 Jun 2016 16:05:28 +0100
Subject: Yield on the sleeps intended to backoff replication
---
synapse/app/pusher.py | 2 +-
synapse/app/synchrotron.py | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f1de1e7ce9..3c3fa38053 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -311,7 +311,7 @@ class PusherServer(HomeServer):
poke_pushers(result)
except:
logger.exception("Error replicating from %r", replication_url)
- sleep(30)
+ yield sleep(30)
def setup(config_options):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index aa81e1c5da..7273055cc1 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -443,7 +443,7 @@ class SynchrotronServer(HomeServer):
notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
- sleep(5)
+ yield sleep(5)
def build_presence_handler(self):
return SynchrotronPresence(self)
--
cgit 1.5.1
From 4a5bbb1941ae63f1d6632aa35e80274e56c8dbb9 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Mon, 6 Jun 2016 16:37:12 +0100
Subject: Fix a KeyError in the synchrotron presence
---
synapse/app/synchrotron.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index aa81e1c5da..3d0d5cc15a 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -187,7 +187,10 @@ class SynchrotronPresence(object):
yield self._send_syncing_users_now()
def _end():
- if affect_presence:
+ # We check that the user_id is in user_to_num_current_syncs because
+ # user_to_num_current_syncs may have been cleared if we are
+ # shutting down.
+ if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
@contextlib.contextmanager
--
cgit 1.5.1
From 310197bab5cf8ed2c26fae522f15f092dbcdff58 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 09:34:50 +0100
Subject: Fix AS retries
---
synapse/storage/appservice.py | 4 ++--
tests/storage/test_appservice.py | 6 +++---
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index ffb7d4a25b..a281571637 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
event_ids = json.loads(entry["event_ids"])
- events = yield self.get_events(event_ids)
+ event_map = yield self.get_events(event_ids)
defer.returnValue(AppServiceTransaction(
- service=service, id=entry["txn_id"], events=events
+ service=service, id=entry["txn_id"], events=event_map.values()
))
def _get_oldest_unsent_txn(self, txn, service):
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index f44c4870e3..6db4b966db 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_get_oldest_unsent_txn(self):
service = Mock(id=self.as_list[0]["id"])
- events = [Mock(event_id="e1"), Mock(event_id="e2")]
+ events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")}
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
# we aren't testing store._base stuff here, so mock this out
self.store.get_events = Mock(return_value=events)
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
- yield self._insert_txn(service.id, 10, events)
+ yield self._insert_txn(service.id, 10, events.values())
yield self._insert_txn(service.id, 11, other_events)
yield self._insert_txn(service.id, 12, other_events)
txn = yield self.store.get_oldest_unsent_txn(service)
self.assertEquals(service, txn.service)
self.assertEquals(10, txn.id)
- self.assertEquals(events, txn.events)
+ self.assertEquals(events.values(), txn.events)
@defer.inlineCallbacks
def test_get_appservices_by_state_single(self):
--
cgit 1.5.1
From 84379062f9ec259abc302af321d4ed8f5a958c01 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 10:24:50 +0100
Subject: Fix AS retries, but with correct ordering
---
synapse/storage/appservice.py | 4 ++--
tests/storage/test_appservice.py | 8 ++++----
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index a281571637..d1ee533fac 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
event_ids = json.loads(entry["event_ids"])
- event_map = yield self.get_events(event_ids)
+ events = yield self._get_events(event_ids)
defer.returnValue(AppServiceTransaction(
- service=service, id=entry["txn_id"], events=event_map.values()
+ service=service, id=entry["txn_id"], events=events
))
def _get_oldest_unsent_txn(self, txn, service):
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 6db4b966db..3e2862daae 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_get_oldest_unsent_txn(self):
service = Mock(id=self.as_list[0]["id"])
- events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")}
+ events = [Mock(event_id="e1"), Mock(event_id="e2")]
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
# we aren't testing store._base stuff here, so mock this out
- self.store.get_events = Mock(return_value=events)
+ self.store._get_events = Mock(return_value=events)
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
- yield self._insert_txn(service.id, 10, events.values())
+ yield self._insert_txn(service.id, 10, events)
yield self._insert_txn(service.id, 11, other_events)
yield self._insert_txn(service.id, 12, other_events)
txn = yield self.store.get_oldest_unsent_txn(service)
self.assertEquals(service, txn.service)
self.assertEquals(10, txn.id)
- self.assertEquals(events.values(), txn.events)
+ self.assertEquals(events, txn.events)
@defer.inlineCallbacks
def test_get_appservices_by_state_single(self):
--
cgit 1.5.1
From 88625db05f274ad855fb51b33c84c09c947a6bd0 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Tue, 7 Jun 2016 11:33:36 +0100
Subject: Notify users for events in rooms they join.
Change how the notifier updates the map from room_id to user streams on
receiving a join event. Make it update the map when it notifies for the
join event, rather than using the "user_joined_room" distributor signal
---
synapse/notifier.py | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index cbec4d30ae..30883a0696 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,7 +14,7 @@
# limitations under the License.
from twisted.internet import defer
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
@@ -152,10 +152,6 @@ class Notifier(object):
self.appservice_handler = hs.get_application_service_handler()
self.state_handler = hs.get_state_handler()
- hs.get_distributor().observe(
- "user_joined_room", self._user_joined_room
- )
-
self.clock.looping_call(
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
)
@@ -248,6 +244,9 @@ class Notifier(object):
)
app_streams |= app_user_streams
+ if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+ self._user_joined_room(event.state_key, event.room_id)
+
self.on_new_event(
"room_key", room_stream_id,
users=extra_users,
@@ -483,9 +482,8 @@ class Notifier(object):
user_stream.appservice, set()
).add(user_stream)
- def _user_joined_room(self, user, room_id):
- user = str(user)
- new_user_stream = self.user_to_user_stream.get(user)
+ def _user_joined_room(self, user_id, room_id):
+ new_user_stream = self.user_to_user_stream.get(user_id)
if new_user_stream is not None:
room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream)
--
cgit 1.5.1
From 75331c5fca6d2207094b8cbf0b3bb34cc52a4ec4 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 13:33:13 +0100
Subject: Change the way we do stats
---
synapse/metrics/__init__.py | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index f317034b8f..ef14bcd840 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -153,11 +153,7 @@ reactor_metrics = get_metrics_for("reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
-gc_time = (
- reactor_metrics.register_distribution("gc_time_gen0"),
- reactor_metrics.register_distribution("gc_time_gen2"),
- reactor_metrics.register_distribution("gc_time_gen2"),
-)
+gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"])
def runUntilCurrentTimer(func):
@@ -190,7 +186,7 @@ def runUntilCurrentTimer(func):
# one if necessary.
threshold = gc.get_threshold()
counts = gc.get_count()
- for i in [2, 1, 0]:
+ for i in (0, 1, 2):
if threshold[i] < counts[i]:
logger.info("Collecting gc %d", i)
@@ -198,7 +194,7 @@ def runUntilCurrentTimer(func):
gc.collect(i)
end = time.time() * 1000
- gc_time[i].inc_by(end - start)
+ gc_time.inc_by(end - start, i)
return ret
--
cgit 1.5.1
From 48e65099b52383743a47844b6369e173b9a96f90 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 13:40:22 +0100
Subject: Also record number of unreachable objects
---
synapse/metrics/__init__.py | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef14bcd840..b29cec3de1 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -154,6 +154,7 @@ tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"])
+gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"])
def runUntilCurrentTimer(func):
@@ -186,15 +187,16 @@ def runUntilCurrentTimer(func):
# one if necessary.
threshold = gc.get_threshold()
counts = gc.get_count()
- for i in (0, 1, 2):
+ for i in (2, 1, 0):
if threshold[i] < counts[i]:
logger.info("Collecting gc %d", i)
start = time.time() * 1000
- gc.collect(i)
+ unreachable = gc.collect(i)
end = time.time() * 1000
gc_time.inc_by(end - start, i)
+ gc_unreachable.inc_by(unreachable, i)
return ret
--
cgit 1.5.1
From 0b2158719c43eab87ab7a9448ae1d85008b92b92 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Tue, 7 Jun 2016 15:07:11 +0100
Subject: Remove dead code.
Loading push rules now happens in the datastore, so we can remove
the methods that loaded them outside the datastore.
The ``waiting_for_join_list`` in federation handler is populated by
anything, so can be removed.
The ``_get_members_events_txn`` method isn't called from anywhere
so can be removed.
---
synapse/handlers/federation.py | 13 -------------
synapse/push/bulk_push_rule_evaluator.py | 8 --------
synapse/push/clientformat.py | 26 --------------------------
synapse/storage/roommember.py | 7 -------
4 files changed, 54 deletions(-)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 648a505e65..ff83c608e7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -66,10 +66,6 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.distributor.observe("user_joined_room", self.user_joined_room)
-
- self.waiting_for_join_list = {}
-
self.store = hs.get_datastore()
self.replication_layer = hs.get_replication_layer()
self.state_handler = hs.get_state_handler()
@@ -1091,15 +1087,6 @@ class FederationHandler(BaseHandler):
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
- @log_function
- def user_joined_room(self, user, room_id):
- waiters = self.waiting_for_join_list.get(
- (user.to_string(), room_id),
- []
- )
- while waiters:
- waiters.pop().callback(None)
-
@defer.inlineCallbacks
@log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 6e42121b1d..756e5da513 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -14,7 +14,6 @@
# limitations under the License.
import logging
-import ujson as json
from twisted.internet import defer
@@ -27,13 +26,6 @@ from synapse.visibility import filter_events_for_clients
logger = logging.getLogger(__name__)
-def decode_rule_json(rule):
- rule = dict(rule)
- rule['conditions'] = json.loads(rule['conditions'])
- rule['actions'] = json.loads(rule['actions'])
- return rule
-
-
@defer.inlineCallbacks
def _get_rules(room_id, user_ids, store):
rules_by_user = yield store.bulk_get_push_rules(user_ids)
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index b3983f7940..e0331b2d2d 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -13,37 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.push.baserules import list_with_base_rules
-
from synapse.push.rulekinds import (
PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
)
import copy
-import simplejson as json
-
-
-def load_rules_for_user(user, rawrules, enabled_map):
- ruleslist = []
- for rawrule in rawrules:
- rule = dict(rawrule)
- rule["conditions"] = json.loads(rawrule["conditions"])
- rule["actions"] = json.loads(rawrule["actions"])
- ruleslist.append(rule)
-
- # We're going to be mutating this a lot, so do a deep copy
- rules = list(list_with_base_rules(ruleslist))
-
- for i, rule in enumerate(rules):
- rule_id = rule['rule_id']
- if rule_id in enabled_map:
- if rule.get('enabled', True) != bool(enabled_map[rule_id]):
- # Rules are cached across users.
- rule = dict(rule)
- rule['enabled'] = bool(enabled_map[rule_id])
- rules[i] = rule
-
- return rules
def format_push_rules_for_user(user, ruleslist):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 64b4bd371b..8bd693be72 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -243,13 +243,6 @@ class RoomMemberStore(SQLBaseStore):
user_ids = yield self.get_users_in_room(room_id)
defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids))
- def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
- rows = self._get_members_rows_txn(
- txn,
- room_id, membership, user_id,
- )
- return [r["event_id"] for r in rows]
-
def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
where_clause = "c.room_id = ?"
where_values = [room_id]
--
cgit 1.5.1
From dded389ac16ec023c986df400d25ca94a4a28677 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 15:45:56 +0100
Subject: Allow setting of gc.set_thresholds
---
synapse/app/homeserver.py | 5 +++++
synapse/app/pusher.py | 5 +++++
synapse/app/synchrotron.py | 15 ++++++++++-----
synapse/config/server.py | 19 ++++++++++++++++++-
4 files changed, 38 insertions(+), 6 deletions(-)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index df675c0ed4..22e1721fc4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -16,6 +16,7 @@
import synapse
+import gc
import logging
import os
import sys
@@ -351,6 +352,8 @@ class SynapseService(service.Service):
def startService(self):
hs = setup(self.config)
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
def stopService(self):
return self._port.stopListening()
@@ -422,6 +425,8 @@ def run(hs):
# sys.settrace(logcontext_tracer)
with LoggingContext("run"):
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
reactor.run()
if hs.config.daemonize:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 3c3fa38053..7e2bf7ecc2 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -43,6 +43,7 @@ from twisted.web.resource import Resource
from daemonize import Daemonize
+import gc
import sys
import logging
@@ -342,6 +343,8 @@ def setup(config_options):
ps.start_listening()
change_resource_limit(ps.config.soft_file_limit)
+ if ps.config.gc_thresholds:
+ gc.set_threshold(*ps.config.gc_thresholds)
def start():
ps.replicate()
@@ -361,6 +364,8 @@ if __name__ == '__main__':
def run():
with LoggingContext("run"):
change_resource_limit(ps.config.soft_file_limit)
+ if ps.config.gc_thresholds:
+ gc.set_threshold(*ps.config.gc_thresholds)
reactor.run()
daemon = Daemonize(
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 5c552ffb29..f9673ab8d8 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -57,6 +57,7 @@ from daemonize import Daemonize
import sys
import logging
import contextlib
+import gc
import ujson as json
logger = logging.getLogger("synapse.app.synchrotron")
@@ -484,6 +485,8 @@ def setup(config_options):
ss.start_listening()
change_resource_limit(ss.config.soft_file_limit)
+ if ss.config.gc_thresholds:
+ ss.set_threshold(*ss.config.gc_thresholds)
def start():
ss.get_datastore().start_profiling()
@@ -496,17 +499,19 @@ def setup(config_options):
if __name__ == '__main__':
with LoggingContext("main"):
- ps = setup(sys.argv[1:])
+ ss = setup(sys.argv[1:])
- if ps.config.daemonize:
+ if ss.config.daemonize:
def run():
with LoggingContext("run"):
- change_resource_limit(ps.config.soft_file_limit)
+ change_resource_limit(ss.config.soft_file_limit)
+ if ss.config.gc_thresholds:
+ gc.set_threshold(*ss.config.gc_thresholds)
reactor.run()
daemon = Daemonize(
- app="synapse-pusher",
- pid=ps.config.pid_file,
+ app="synapse-synchrotron",
+ pid=ss.config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
diff --git a/synapse/config/server.py b/synapse/config/server.py
index c2d8f8a52f..44b8d422e0 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import Config
+from ._base import Config, ConfigError
class ServerConfig(Config):
@@ -38,6 +38,20 @@ class ServerConfig(Config):
self.listeners = config.get("listeners", [])
+ thresholds = config.get("gc_thresholds", None)
+ if thresholds is not None:
+ try:
+ assert len(thresholds) == 3
+ self.gc_thresholds = (
+ int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+ )
+ except:
+ raise ConfigError(
+ "Value of `gc_threshold` must be a list of three integers if set"
+ )
+ else:
+ self.gc_thresholds = None
+
bind_port = config.get("bind_port")
if bind_port:
self.listeners = []
@@ -157,6 +171,9 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
+ # The GC threshold parameters to pass to `gc.set_threshold`, if defined
+ # gc_thresholds: [700, 10, 10]
+
# A list of other Home Servers to fetch the public room directory from
# and include in the public room directory of this home server
# This is a temporary stopgap solution to populate new server with a
--
cgit 1.5.1
From 2d1d1025fac846e2746dc627c0ebb6542c1488d3 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 16:26:25 +0100
Subject: Add gc_threshold to pusher and synchrotron
---
synapse/app/pusher.py | 14 ++++++++++++++
synapse/app/synchrotron.py | 14 ++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 7e2bf7ecc2..4ec23d84c1 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -65,6 +65,20 @@ class SlaveConfig(DatabaseConfig):
self.pid_file = self.abspath(config.get("pid_file"))
self.public_baseurl = config["public_baseurl"]
+ thresholds = config.get("gc_thresholds", None)
+ if thresholds is not None:
+ try:
+ assert len(thresholds) == 3
+ self.gc_thresholds = (
+ int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+ )
+ except:
+ raise ConfigError(
+ "Value of `gc_threshold` must be a list of three integers if set"
+ )
+ else:
+ self.gc_thresholds = None
+
# some things used by the auth handler but not actually used in the
# pusher codebase
self.bcrypt_rounds = None
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index f9673ab8d8..297e199453 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -78,6 +78,20 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
self.macaroon_secret_key = config["macaroon_secret_key"]
self.expire_access_token = config.get("expire_access_token", False)
+ thresholds = config.get("gc_thresholds", None)
+ if thresholds is not None:
+ try:
+ assert len(thresholds) == 3
+ self.gc_thresholds = (
+ int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+ )
+ except:
+ raise ConfigError(
+ "Value of `gc_threshold` must be a list of three integers if set"
+ )
+ else:
+ self.gc_thresholds = None
+
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("synchroton.pid")
return """\
--
cgit 1.5.1
From 64935d11f7730702cafba8591512ddb57e8fadf1 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Tue, 7 Jun 2016 16:35:28 +0100
Subject: Add script for running sytest with dendron
---
jenkins-dendron-postgres.sh | 84 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 84 insertions(+)
create mode 100755 jenkins-dendron-postgres.sh
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
new file mode 100755
index 0000000000..8e3a4c51a9
--- /dev/null
+++ b/jenkins-dendron-postgres.sh
@@ -0,0 +1,84 @@
+#!/bin/bash
+
+set -eux
+
+: ${WORKSPACE:="$(pwd)"}
+
+export PYTHONDONTWRITEBYTECODE=yep
+export SYNAPSE_CACHE_FACTOR=1
+
+# Output test results as junit xml
+export TRIAL_FLAGS="--reporter=subunit"
+export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
+# Write coverage reports to a separate file for each process
+export COVERAGE_OPTS="-p"
+export DUMP_COVERAGE_COMMAND="coverage help"
+
+# Output flake8 violations to violations.flake8.log
+# Don't exit with non-0 status code on Jenkins,
+# so that the build steps continue and a later step can decided whether to
+# UNSTABLE or FAILURE this build.
+export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
+
+rm .coverage* || echo "No coverage files to remove"
+
+tox --notest -e py27
+
+TOX_BIN=$WORKSPACE/.tox/py27/bin
+python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
+$TOX_BIN/pip install psycopg2
+$TOX_BIN/pip install lxml
+
+: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
+
+if [[ ! -e .dendron-base ]]; then
+ git clone https://github.com/matrix-org/dendron.git .dendron-base --mirror
+else
+ (cd .dendron-base; git fetch -p)
+fi
+
+rm -rf dendron
+git clone .dendron-base dendron --shared
+cd dendron
+
+: ${GOPATH:=${WORKSPACE}/.gopath}
+if [[ "${GOPATH}" != *:* ]]; then
+ mkdir -p "${GOPATH}"
+ export PATH="${GOPATH}/bin:${PATH}"
+fi
+export GOPATH
+
+git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
+
+go get github.com/constabulary/gb/...
+gb generate
+gb build
+
+cd ..
+
+
+if [[ ! -e .sytest-base ]]; then
+ git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror
+else
+ (cd .sytest-base; git fetch -p)
+fi
+
+rm -rf sytest
+git clone .sytest-base sytest --shared
+cd sytest
+
+git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
+
+: ${PORT_BASE:=8000}
+
+./jenkins/prep_sytest_for_postgres.sh
+
+echo >&2 "Running sytest with PostgreSQL";
+./jenkins/install_and_run.sh --python $TOX_BIN/python \
+ --synapse-directory $WORKSPACE \
+ --dendron $WORKSPACE/dendron/bin/dendron \
+ --synchrotron \
+ --pusher \
+ --port-base $PORT_BASE
+
+cd ..
--
cgit 1.5.1
From 18f0cc7d993408a754e7ff26e9474a969adf762a Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 16:51:01 +0100
Subject: Record some more GC metrics
---
synapse/metrics/__init__.py | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index b29cec3de1..8f69aa1ff3 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -156,6 +156,11 @@ pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"])
gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"])
+reactor_metrics.register_callback("gc_total_objects", lambda: len(gc.get_objects()))
+reactor_metrics.register_callback(
+ "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
+)
+
def runUntilCurrentTimer(func):
--
cgit 1.5.1
From 0f2165ccf4fd0ae6636018cea7e1b91141179e88 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 7 Jun 2016 17:00:45 +0100
Subject: Don't track total objects as its too expensive to calculate
---
synapse/metrics/__init__.py | 1 -
1 file changed, 1 deletion(-)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 8f69aa1ff3..bdd7292a30 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -156,7 +156,6 @@ pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"])
gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"])
-reactor_metrics.register_callback("gc_total_objects", lambda: len(gc.get_objects()))
reactor_metrics.register_callback(
"gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
)
--
cgit 1.5.1
From bab916bccc57734fb96f7f9be66b1b15b2ed4dbf Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 11:05:45 +0100
Subject: Bump version and changelog to v0.16.0-rc2
---
CHANGES.rst | 27 +++++++++++++++++++++++++++
synapse/__init__.py | 2 +-
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/CHANGES.rst b/CHANGES.rst
index e77b31b583..40f7ebd734 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,30 @@
+Changes in synapse v0.16.0-rc2 (2016-06-08)
+===========================================
+
+Features:
+
+* Add configuration option for tuning GC via ``gc.set_threshold`` (PR #849)
+
+Changes:
+
+* Record metrics about GC (PR #771, #847, #852)
+* Add metric counter for number of persisted events (PR #841)
+
+Bug fixes:
+
+* Fix 'From' header in email notifications (PR #843)
+* Fix presence where timeouts were not being fired for the first 8h after
+ restarts (PR #842)
+* Fix bug where synapse sent malformed transactions to AS's when retrying
+ transactions (Commits 310197b, 843790)
+
+Performance Improvements:
+
+* Remove event fetching from DB threads (PR #835)
+* Change the way we cache events (PR #836)
+* Add events to cache when we persist them (PR #840)
+
+
Changes in synapse v0.16.0-rc1 (2016-06-03)
===========================================
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 3b290db79f..ad088a7880 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.16.0-rc1"
+__version__ = "0.16.0-rc2"
--
cgit 1.5.1
From 66503a69c9070230c99737976bff73f68079e4d2 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 11:13:56 +0100
Subject: Update commit hash in changelog
---
CHANGES.rst | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/CHANGES.rst b/CHANGES.rst
index 40f7ebd734..6194b3eb69 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -16,7 +16,7 @@ Bug fixes:
* Fix presence where timeouts were not being fired for the first 8h after
restarts (PR #842)
* Fix bug where synapse sent malformed transactions to AS's when retrying
- transactions (Commits 310197b, 843790)
+ transactions (Commits 310197b, 8437906)
Performance Improvements:
--
cgit 1.5.1
From 1a815fb04f1d17286be27379dd7463936606bd3a Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 11:33:30 +0100
Subject: Don't hit DB for noop replications queries
---
synapse/handlers/typing.py | 3 +++
synapse/storage/account_data.py | 3 +++
synapse/storage/presence.py | 3 +++
synapse/storage/push_rule.py | 3 +++
synapse/storage/tags.py | 3 +++
5 files changed, 15 insertions(+)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 861b8f7989..5589296c09 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -221,6 +221,9 @@ class TypingHandler(object):
def get_all_typing_updates(self, last_id, current_id):
# TODO: Work out a way to do this without scanning the entire state.
+ if last_id == current_id:
+ return []
+
rows = []
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index ec7e8d40d2..3fa226e92d 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -138,6 +138,9 @@ class AccountDataStore(SQLBaseStore):
A deferred pair of lists of tuples of stream_id int, user_id string,
room_id string, type string, and content string.
"""
+ if last_room_id == current_id and last_global_id == current_id:
+ return defer.succeed(([], []))
+
def get_updated_account_data_txn(txn):
sql = (
"SELECT stream_id, user_id, account_data_type, content"
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 3fab57a7e8..d03f7c541e 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -118,6 +118,9 @@ class PresenceStore(SQLBaseStore):
)
def get_all_presence_updates(self, last_id, current_id):
+ if last_id == current_id:
+ return defer.succeed([])
+
def get_all_presence_updates_txn(txn):
sql = (
"SELECT stream_id, user_id, state, last_active_ts,"
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 786d6f6d67..8183b7f1b0 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -421,6 +421,9 @@ class PushRuleStore(SQLBaseStore):
def get_all_push_rule_updates(self, last_id, current_id, limit):
"""Get all the push rules changes that have happend on the server"""
+ if last_id == current_id:
+ return defer.succeed([])
+
def get_all_push_rule_updates_txn(txn):
sql = (
"SELECT stream_id, event_stream_ordering, user_id, rule_id,"
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 9da23f34cb..5a2c1aa59b 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -68,6 +68,9 @@ class TagsStore(SQLBaseStore):
A deferred list of tuples of stream_id int, user_id string,
room_id string, tag string and content string.
"""
+ if last_id == current_id:
+ defer.returnValue([])
+
def get_all_updated_tags_txn(txn):
sql = (
"SELECT stream_id, user_id, room_id"
--
cgit 1.5.1
From 17aab5827a1a1eace4e44d130eef7da4dda6984f Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 11:55:31 +0100
Subject: Add some logging for when servers ask for missing events
---
synapse/federation/federation_server.py | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f1d231b9d8..9f2a64dede 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -377,10 +377,20 @@ class FederationServer(FederationBase):
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
+ logger.info(
+ "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+ " limit: %d, min_depth: %d",
+ earliest_events, latest_events, limit, min_depth
+ )
missing_events = yield self.handler.on_get_missing_events(
origin, room_id, earliest_events, latest_events, limit, min_depth
)
+ if len(missing_events) < 5:
+ logger.info("Returning %d events: %r", len(missing_events), missing_events)
+ else:
+ logger.info("Returning %d events", len(missing_events))
+
time_now = self._clock.time_msec()
defer.returnValue({
@@ -490,6 +500,11 @@ class FederationServer(FederationBase):
latest = set(latest)
latest |= seen
+ logger.info(
+ "Missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+
missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
@@ -517,6 +532,10 @@ class FederationServer(FederationBase):
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if prevs - seen:
+ logger.info(
+ "Still missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
fetch_state = True
if fetch_state:
--
cgit 1.5.1
From 1fd6eb695d1fffbe830faf50c13607116300095b Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 14:15:18 +0100
Subject: Enable auth on federation PublicRoomList
---
synapse/federation/transport/server.py | 5 -----
1 file changed, 5 deletions(-)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a1a334955f..ab9f38f010 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -532,11 +532,6 @@ class PublicRoomList(BaseFederationServlet):
data = yield self.room_list_handler.get_local_public_room_list()
defer.returnValue((200, data))
- # Avoid doing remote HS authorization checks which are done by default by
- # BaseFederationServlet.
- def _wrap(self, code):
- return code
-
SERVLET_CLASSES = (
FederationSendServlet,
--
cgit 1.5.1
From efeabd31801224cbacd31b61ff0d869b70b1820d Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 14:23:15 +0100
Subject: Log user that is making /publicRooms calls
---
synapse/rest/client/v1/room.py | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index db52a1fc39..604c2a565e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -279,6 +279,13 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
+ try:
+ yield self.auth.get_user_by_req(request)
+ except AuthError:
+ # This endpoint isn't authed, but its useful to know who's hitting
+ # it if they *do* supply an access token
+ pass
+
handler = self.hs.get_room_list_handler()
data = yield handler.get_aggregated_public_room_list()
--
cgit 1.5.1
From d88faf92d16d9384433452e4fb7901fd2bd6eda4 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 14:39:31 +0100
Subject: Fix up federation PublicRoomList
---
synapse/federation/transport/server.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index ab9f38f010..6fc3e2207c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -528,7 +528,7 @@ class PublicRoomList(BaseFederationServlet):
PATH = "/publicRooms"
@defer.inlineCallbacks
- def on_GET(self, request):
+ def on_GET(self, origin, content, query):
data = yield self.room_list_handler.get_local_public_room_list()
defer.returnValue((200, data))
--
cgit 1.5.1
From 690029d1a3ebd26f56656a723fefdeafd71310e4 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 14:47:42 +0100
Subject: Don't make rooms visibile by default
---
synapse/rest/client/v1/room.py | 2 --
1 file changed, 2 deletions(-)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 604c2a565e..86fbe2747d 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -72,8 +72,6 @@ class RoomCreateRestServlet(ClientV1RestServlet):
def get_room_config(self, request):
user_supplied_config = parse_json_object_from_request(request)
- # default visibility
- user_supplied_config.setdefault("visibility", "public")
return user_supplied_config
def on_OPTIONS(self, request):
--
cgit 1.5.1
From defa28efa186013fab18b3da76f60273cb6c3bb1 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Wed, 8 Jun 2016 15:11:31 +0100
Subject: Disable the synchrotron on jenkins until the sytest support lands
(#855)
* Disable the synchrotron on jenkins until the sytest support lands
* Poke jenkins
* Poke jenkins
* Poke jenkins
* Poke jenkins
* Poke jenkins
* Poke jenkins
* Poke jenkins
* Poke jenkins
---
jenkins-dendron-postgres.sh | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
index 8e3a4c51a9..d15836e6bf 100755
--- a/jenkins-dendron-postgres.sh
+++ b/jenkins-dendron-postgres.sh
@@ -73,11 +73,12 @@ git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling b
./jenkins/prep_sytest_for_postgres.sh
+mkdir -p var
+
echo >&2 "Running sytest with PostgreSQL";
./jenkins/install_and_run.sh --python $TOX_BIN/python \
--synapse-directory $WORKSPACE \
--dendron $WORKSPACE/dendron/bin/dendron \
- --synchrotron \
--pusher \
--port-base $PORT_BASE
--
cgit 1.5.1
From 81c07a32fd27260b5112dcc87845a9e87fa5db58 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Wed, 8 Jun 2016 15:43:37 +0100
Subject: Pull full state for each room all at once
---
synapse/handlers/room.py | 32 ++++++++++++++++----------------
1 file changed, 16 insertions(+), 16 deletions(-)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 9fd34588dd..ae44c7a556 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -20,7 +20,7 @@ from ._base import BaseHandler
from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
- EventTypes, JoinRules, RoomCreationPreset,
+ EventTypes, JoinRules, RoomCreationPreset, Membership,
)
from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.util import stringutils
@@ -367,14 +367,10 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def handle_room(room_id):
- # We pull each bit of state out indvidually to avoid pulling the
- # full state into memory. Due to how the caching works this should
- # be fairly quick, even if not originally in the cache.
- def get_state(etype, state_key):
- return self.state_handler.get_current_state(room_id, etype, state_key)
+ current_state = yield self.state_handler.get_current_state(room_id)
# Double check that this is actually a public room.
- join_rules_event = yield get_state(EventTypes.JoinRules, "")
+ join_rules_event = current_state.get((EventTypes.JoinRules, ""))
if join_rules_event:
join_rule = join_rules_event.content.get("join_rule", None)
if join_rule and join_rule != JoinRules.PUBLIC:
@@ -382,47 +378,51 @@ class RoomListHandler(BaseHandler):
result = {"room_id": room_id}
- joined_users = yield self.store.get_users_in_room(room_id)
- if len(joined_users) == 0:
+ num_joined_users = len([
+ 1 for _, event in current_state.items()
+ if event.type == EventTypes.Member
+ and event.membership == Membership.JOIN
+ ])
+ if num_joined_users == 0:
return
- result["num_joined_members"] = len(joined_users)
+ result["num_joined_members"] = num_joined_users
aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
result["aliases"] = aliases
- name_event = yield get_state(EventTypes.Name, "")
+ name_event = yield current_state.get((EventTypes.Name, ""))
if name_event:
name = name_event.content.get("name", None)
if name:
result["name"] = name
- topic_event = yield get_state(EventTypes.Topic, "")
+ topic_event = current_state.get((EventTypes.Topic, ""))
if topic_event:
topic = topic_event.content.get("topic", None)
if topic:
result["topic"] = topic
- canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
+ canonical_event = current_state.get((EventTypes.CanonicalAlias, ""))
if canonical_event:
canonical_alias = canonical_event.content.get("alias", None)
if canonical_alias:
result["canonical_alias"] = canonical_alias
- visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
+ visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, ""))
visibility = None
if visibility_event:
visibility = visibility_event.content.get("history_visibility", None)
result["world_readable"] = visibility == "world_readable"
- guest_event = yield get_state(EventTypes.GuestAccess, "")
+ guest_event = current_state.get((EventTypes.GuestAccess, ""))
guest = None
if guest_event:
guest = guest_event.content.get("guest_access", None)
result["guest_can_join"] = guest == "can_join"
- avatar_event = yield get_state("m.room.avatar", "")
+ avatar_event = current_state.get(("m.room.avatar", ""))
if avatar_event:
avatar_url = avatar_event.content.get("url", None)
if avatar_url:
--
cgit 1.5.1
From 6e7dc7c7dde377794c23d5db6f25ffacfb08e82a Mon Sep 17 00:00:00 2001
From: Negar Fazeli
Date: Wed, 8 Jun 2016 19:16:46 +0200
Subject: Fix a bug caused by a change in auth_handler function Fix the
relevant unit test cases
---
synapse/handlers/register.py | 4 ++--
tests/handlers/test_register.py | 9 +++------
2 files changed, 5 insertions(+), 8 deletions(-)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index bbc07b045e..e0aaefe7be 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -388,8 +388,8 @@ class RegistrationHandler(BaseHandler):
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
- auth_handler = self.hs.get_handlers().auth_handler
- token = auth_handler.generate_short_term_login_token(user_id, duration_seconds)
+ token = self.auth_handler().generate_short_term_login_token(
+ user_id, duration_seconds)
if need_register:
yield self.store.register(
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index 9d5c653b45..69a5e5b1d4 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -41,14 +41,15 @@ class RegistrationTestCase(unittest.TestCase):
handlers=None,
http_client=None,
expire_access_token=True)
+ self.auth_handler = Mock(
+ generate_short_term_login_token=Mock(return_value='secret'))
self.hs.handlers = RegistrationHandlers(self.hs)
self.handler = self.hs.get_handlers().registration_handler
self.hs.get_handlers().profile_handler = Mock()
self.mock_handler = Mock(spec=[
"generate_short_term_login_token",
])
-
- self.hs.get_handlers().auth_handler = self.mock_handler
+ self.hs.get_auth_handler = Mock(return_value=self.auth_handler)
@defer.inlineCallbacks
def test_user_is_created_and_logged_in_if_doesnt_exist(self):
@@ -56,8 +57,6 @@ class RegistrationTestCase(unittest.TestCase):
local_part = "someone"
display_name = "someone"
user_id = "@someone:test"
- mock_token = self.mock_handler.generate_short_term_login_token
- mock_token.return_value = 'secret'
result_user_id, result_token = yield self.handler.get_or_create_user(
local_part, display_name, duration_ms)
self.assertEquals(result_user_id, user_id)
@@ -75,8 +74,6 @@ class RegistrationTestCase(unittest.TestCase):
local_part = "frank"
display_name = "Frank"
user_id = "@frank:test"
- mock_token = self.mock_handler.generate_short_term_login_token
- mock_token.return_value = 'secret'
result_user_id, result_token = yield self.handler.get_or_create_user(
local_part, display_name, duration_ms)
self.assertEquals(result_user_id, user_id)
--
cgit 1.5.1
From 95f305c35a790e8f10fef7e16268dfaba6bc4c31 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 9 Jun 2016 10:57:11 +0100
Subject: Remove redundant exception log in /events
---
synapse/rest/client/v1/events.py | 45 +++++++++++++++++++---------------------
1 file changed, 21 insertions(+), 24 deletions(-)
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index d1afa0f0d5..498bb9e18a 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -45,30 +45,27 @@ class EventStreamRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Guest users must specify room_id param")
if "room_id" in request.args:
room_id = request.args["room_id"][0]
- try:
- handler = self.handlers.event_stream_handler
- pagin_config = PaginationConfig.from_request(request)
- timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
- if "timeout" in request.args:
- try:
- timeout = int(request.args["timeout"][0])
- except ValueError:
- raise SynapseError(400, "timeout must be in milliseconds.")
-
- as_client_event = "raw" not in request.args
-
- chunk = yield handler.get_stream(
- requester.user.to_string(),
- pagin_config,
- timeout=timeout,
- as_client_event=as_client_event,
- affect_presence=(not is_guest),
- room_id=room_id,
- is_guest=is_guest,
- )
- except:
- logger.exception("Event stream failed")
- raise
+
+ handler = self.handlers.event_stream_handler
+ pagin_config = PaginationConfig.from_request(request)
+ timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
+ if "timeout" in request.args:
+ try:
+ timeout = int(request.args["timeout"][0])
+ except ValueError:
+ raise SynapseError(400, "timeout must be in milliseconds.")
+
+ as_client_event = "raw" not in request.args
+
+ chunk = yield handler.get_stream(
+ requester.user.to_string(),
+ pagin_config,
+ timeout=timeout,
+ as_client_event=as_client_event,
+ affect_presence=(not is_guest),
+ room_id=room_id,
+ is_guest=is_guest,
+ )
defer.returnValue((200, chunk))
--
cgit 1.5.1
From eba4ff1bcbd99ae5b23f7cdae2306662319d3b4a Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 9 Jun 2016 11:29:43 +0100
Subject: 502 on /thumbnail when can't contact remote server
---
synapse/rest/media/v1/media_repository.py | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index d96bf9afe2..2468c3ac42 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -26,6 +26,7 @@ from .thumbnailer import Thumbnailer
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.util.stringutils import random_string
+from synapse.api.errors import SynapseError
from twisted.internet import defer, threads
@@ -134,10 +135,15 @@ class MediaRepository(object):
request_path = "/".join((
"/_matrix/media/v1/download", server_name, media_id,
))
- length, headers = yield self.client.get_file(
- server_name, request_path, output_stream=f,
- max_size=self.max_upload_size,
- )
+ try:
+ length, headers = yield self.client.get_file(
+ server_name, request_path, output_stream=f,
+ max_size=self.max_upload_size,
+ )
+ except Exception as e:
+ logger.warn("Failed to fetch remoted media %r", e)
+ raise SynapseError(502, "Failed to fetch remoted media")
+
media_type = headers["Content-Type"][0]
time_now_ms = self.clock.time_msec()
--
cgit 1.5.1
From a31befbcd0f8238920b0ca198d35ef657c78e766 Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 9 Jun 2016 13:23:41 +0100
Subject: Bump version and changelog
---
CHANGES.rst | 10 ++++++++++
synapse/__init__.py | 2 +-
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/CHANGES.rst b/CHANGES.rst
index 6194b3eb69..ad974c586b 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,13 @@
+Changes in synapse v0.16.0 (2016-06-09)
+=======================================
+
+NB: As of v0.14 all AS config files must have an ID specified.
+
+
+Bug fixes:
+
+* Don't make rooms published by default (PR #857)
+
Changes in synapse v0.16.0-rc2 (2016-06-08)
===========================================
diff --git a/synapse/__init__.py b/synapse/__init__.py
index ad088a7880..dc211e9637 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.16.0-rc2"
+__version__ = "0.16.0"
--
cgit 1.5.1
From 8327d5df709f1726c961743d937852e487648f5b Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Thu, 9 Jun 2016 14:16:26 +0100
Subject: Change CHANGELOG
---
CHANGES.rst | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/CHANGES.rst b/CHANGES.rst
index ad974c586b..32f18e7098 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,7 +1,7 @@
Changes in synapse v0.16.0 (2016-06-09)
=======================================
-NB: As of v0.14 all AS config files must have an ID specified.
+NB: As of v0.14 all AS config files must have an ID field.
Bug fixes:
--
cgit 1.5.1
From 7dbb473339bc41daf6c05b64756f97e011f653f5 Mon Sep 17 00:00:00 2001
From: Mark Haines
Date: Thu, 9 Jun 2016 18:50:38 +0100
Subject: Add function to load config without generating it
Renames ``load_config`` to ``load_or_generate_config``
Adds a method called ``load_config`` that just loads the
config.
The main synapse.app.homeserver will continue to use
``load_or_generate_config`` to retain backwards compat.
However new worker processes can use ``load_config`` to
load the config avoiding some of the cruft needed to generate
the config.
As the new ``load_config`` method is expected to be used by new
configs it removes support for the legacy commandline overrides
that ``load_or_generate_config`` supports
---
synapse/app/homeserver.py | 3 +-
synapse/config/_base.py | 147 ++++++++++++++++++++++++++++++------------
tests/config/test_generate.py | 2 +-
tests/config/test_load.py | 22 ++++++-
4 files changed, 126 insertions(+), 48 deletions(-)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 22e1721fc4..40ffd9bf0d 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,10 +266,9 @@ def setup(config_options):
HomeServer
"""
try:
- config = HomeServerConfig.load_config(
+ config = HomeServerConfig.load_or_generate_config(
"Synapse Homeserver",
config_options,
- generate_section="Homeserver"
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 7449f36491..af9f17bf7b 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -157,9 +157,40 @@ class Config(object):
return default_config, config
@classmethod
- def load_config(cls, description, argv, generate_section=None):
+ def load_config(cls, description, argv):
+ config_parser = argparse.ArgumentParser(
+ description=description,
+ )
+ config_parser.add_argument(
+ "-c", "--config-path",
+ action="append",
+ metavar="CONFIG_FILE",
+ help="Specify config file. Can be given multiple times and"
+ " may specify directories containing *.yaml files."
+ )
+
+ config_parser.add_argument(
+ "--keys-directory",
+ metavar="DIRECTORY",
+ help="Where files such as certs and signing keys are stored when"
+ " their location is given explicitly in the config."
+ " Defaults to the directory containing the last config file",
+ )
+
+ config_args = config_parser.parse_args(argv)
+
+ config_files = find_config_files(search_paths=config_args.config_path)
+
obj = cls()
+ obj.read_config_files(
+ config_files,
+ keys_directory=config_args.keys_directory,
+ generate_keys=False,
+ )
+ return obj
+ @classmethod
+ def load_or_generate_config(cls, description, argv):
config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument(
"-c", "--config-path",
@@ -176,7 +207,7 @@ class Config(object):
config_parser.add_argument(
"--report-stats",
action="store",
- help="Stuff",
+ help="Whether the generated config reports anonymized usage statistics",
choices=["yes", "no"]
)
config_parser.add_argument(
@@ -197,36 +228,11 @@ class Config(object):
)
config_args, remaining_args = config_parser.parse_known_args(argv)
+ config_files = find_config_files(search_paths=config_args.config_path)
+
generate_keys = config_args.generate_keys
- config_files = []
- if config_args.config_path:
- for config_path in config_args.config_path:
- if os.path.isdir(config_path):
- # We accept specifying directories as config paths, we search
- # inside that directory for all files matching *.yaml, and then
- # we apply them in *sorted* order.
- files = []
- for entry in os.listdir(config_path):
- entry_path = os.path.join(config_path, entry)
- if not os.path.isfile(entry_path):
- print (
- "Found subdirectory in config directory: %r. IGNORING."
- ) % (entry_path, )
- continue
-
- if not entry.endswith(".yaml"):
- print (
- "Found file in config directory that does not"
- " end in '.yaml': %r. IGNORING."
- ) % (entry_path, )
- continue
-
- files.append(entry_path)
-
- config_files.extend(sorted(files))
- else:
- config_files.append(config_path)
+ obj = cls()
if config_args.generate_config:
if config_args.report_stats is None:
@@ -299,28 +305,43 @@ class Config(object):
" -c CONFIG-FILE\""
)
- if config_args.keys_directory:
- config_dir_path = config_args.keys_directory
- else:
- config_dir_path = os.path.dirname(config_args.config_path[-1])
- config_dir_path = os.path.abspath(config_dir_path)
+ obj.read_config_files(
+ config_files,
+ keys_directory=config_args.keys_directory,
+ generate_keys=generate_keys,
+ )
+
+ if generate_keys:
+ return None
+
+ obj.invoke_all("read_arguments", args)
+
+ return obj
+
+ def read_config_files(self, config_files, keys_directory=None,
+ generate_keys=False):
+ if not keys_directory:
+ keys_directory = os.path.dirname(config_files[-1])
+
+ config_dir_path = os.path.abspath(keys_directory)
specified_config = {}
for config_file in config_files:
- yaml_config = cls.read_config_file(config_file)
+ yaml_config = self.read_config_file(config_file)
specified_config.update(yaml_config)
if "server_name" not in specified_config:
raise ConfigError(MISSING_SERVER_NAME)
server_name = specified_config["server_name"]
- _, config = obj.generate_config(
+ _, config = self.generate_config(
config_dir_path=config_dir_path,
server_name=server_name,
is_generating_file=False,
)
config.pop("log_config")
config.update(specified_config)
+
if "report_stats" not in config:
raise ConfigError(
MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
@@ -328,11 +349,51 @@ class Config(object):
)
if generate_keys:
- obj.invoke_all("generate_files", config)
+ self.invoke_all("generate_files", config)
return
- obj.invoke_all("read_config", config)
-
- obj.invoke_all("read_arguments", args)
-
- return obj
+ self.invoke_all("read_config", config)
+
+
+def find_config_files(search_paths):
+ """Finds config files using a list of search paths. If a path is a file
+ then that file path is added to the list. If a search path is a directory
+ then all the "*.yaml" files in that directory are added to the list in
+ sorted order.
+
+ Args:
+ search_paths(list(str)): A list of paths to search.
+
+ Returns:
+ list(str): A list of file paths.
+ """
+
+ config_files = []
+ if search_paths:
+ for config_path in search_paths:
+ if os.path.isdir(config_path):
+ # We accept specifying directories as config paths, we search
+ # inside that directory for all files matching *.yaml, and then
+ # we apply them in *sorted* order.
+ files = []
+ for entry in os.listdir(config_path):
+ entry_path = os.path.join(config_path, entry)
+ if not os.path.isfile(entry_path):
+ print (
+ "Found subdirectory in config directory: %r. IGNORING."
+ ) % (entry_path, )
+ continue
+
+ if not entry.endswith(".yaml"):
+ print (
+ "Found file in config directory that does not"
+ " end in '.yaml': %r. IGNORING."
+ ) % (entry_path, )
+ continue
+
+ files.append(entry_path)
+
+ config_files.extend(sorted(files))
+ else:
+ config_files.append(config_path)
+ return config_files
diff --git a/tests/config/test_generate.py b/tests/config/test_generate.py
index 4329d73974..8f57fbeb23 100644
--- a/tests/config/test_generate.py
+++ b/tests/config/test_generate.py
@@ -30,7 +30,7 @@ class ConfigGenerationTestCase(unittest.TestCase):
shutil.rmtree(self.dir)
def test_generate_config_generates_files(self):
- HomeServerConfig.load_config("", [
+ HomeServerConfig.load_or_generate_config("", [
"--generate-config",
"-c", self.file,
"--report-stats=yes",
diff --git a/tests/config/test_load.py b/tests/config/test_load.py
index bf46233c5c..161a87d7e3 100644
--- a/tests/config/test_load.py
+++ b/tests/config/test_load.py
@@ -34,6 +34,8 @@ class ConfigLoadingTestCase(unittest.TestCase):
self.generate_config_and_remove_lines_containing("server_name")
with self.assertRaises(Exception):
HomeServerConfig.load_config("", ["-c", self.file])
+ with self.assertRaises(Exception):
+ HomeServerConfig.load_or_generate_config("", ["-c", self.file])
def test_generates_and_loads_macaroon_secret_key(self):
self.generate_config()
@@ -54,11 +56,24 @@ class ConfigLoadingTestCase(unittest.TestCase):
"was: %r" % (config.macaroon_secret_key,)
)
+ config = HomeServerConfig.load_or_generate_config("", ["-c", self.file])
+ self.assertTrue(
+ hasattr(config, "macaroon_secret_key"),
+ "Want config to have attr macaroon_secret_key"
+ )
+ if len(config.macaroon_secret_key) < 5:
+ self.fail(
+ "Want macaroon secret key to be string of at least length 5,"
+ "was: %r" % (config.macaroon_secret_key,)
+ )
+
def test_load_succeeds_if_macaroon_secret_key_missing(self):
self.generate_config_and_remove_lines_containing("macaroon")
config1 = HomeServerConfig.load_config("", ["-c", self.file])
config2 = HomeServerConfig.load_config("", ["-c", self.file])
+ config3 = HomeServerConfig.load_or_generate_config("", ["-c", self.file])
self.assertEqual(config1.macaroon_secret_key, config2.macaroon_secret_key)
+ self.assertEqual(config1.macaroon_secret_key, config3.macaroon_secret_key)
def test_disable_registration(self):
self.generate_config()
@@ -70,14 +85,17 @@ class ConfigLoadingTestCase(unittest.TestCase):
config = HomeServerConfig.load_config("", ["-c", self.file])
self.assertFalse(config.enable_registration)
+ config = HomeServerConfig.load_or_generate_config("", ["-c", self.file])
+ self.assertFalse(config.enable_registration)
+
# Check that either config value is clobbered by the command line.
- config = HomeServerConfig.load_config("", [
+ config = HomeServerConfig.load_or_generate_config("", [
"-c", self.file, "--enable-registration"
])
self.assertTrue(config.enable_registration)
def generate_config(self):
- HomeServerConfig.load_config("", [
+ HomeServerConfig.load_or_generate_config("", [
"--generate-config",
"-c", self.file,
"--report-stats=yes",
--
cgit 1.5.1
From 50f69e2ef266cf08aaff0311705fcf56dc1bd9f3 Mon Sep 17 00:00:00 2001
From: Bartek Rutkowski
Date: Fri, 10 Jun 2016 11:33:43 +0100
Subject: Change /bin/bash to /bin/sh in tox.ini
No features of Bash are used here, so using /bin/sh makes it more portable to systems that don't have Bash natively (like BSD systems).
---
tox.ini | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/tox.ini b/tox.ini
index 757b7189c3..52d93c65e5 100644
--- a/tox.ini
+++ b/tox.ini
@@ -11,7 +11,7 @@ deps =
setenv =
PYTHONDONTWRITEBYTECODE = no_byte_code
commands =
- /bin/bash -c "find {toxinidir} -name '*.pyc' -delete ; coverage run {env:COVERAGE_OPTS:} --source={toxinidir}/synapse \
+ /bin/sh -c "find {toxinidir} -name '*.pyc' -delete ; coverage run {env:COVERAGE_OPTS:} --source={toxinidir}/synapse \
{envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}"
{env:DUMP_COVERAGE_COMMAND:coverage report -m}
@@ -26,4 +26,4 @@ skip_install = True
basepython = python2.7
deps =
flake8
-commands = /bin/bash -c "flake8 synapse tests {env:PEP8SUFFIX:}"
+commands = /bin/sh -c "flake8 synapse tests {env:PEP8SUFFIX:}"
--
cgit 1.5.1
From 36e2aade8790f3f2d86e8f6cc8a6de21e8bec4fa Mon Sep 17 00:00:00 2001
From: Erik Johnston
Date: Tue, 14 Jun 2016 13:25:29 +0100
Subject: Make get_domain_from_id throw SynapseError on invalid ID
---
synapse/types.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/synapse/types.py b/synapse/types.py
index 7b6ae44bdd..f639651a73 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -22,7 +22,10 @@ Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
def get_domain_from_id(string):
- return string.split(":", 1)[1]
+ try:
+ return string.split(":", 1)[1]
+ except IndexError:
+ raise SynapseError(400, "Invalid ID: %r", string)
class DomainSpecificString(
--
cgit 1.5.1
|