From e1625d62a8313ff34662aa72ae4d0574e540cc2b Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 11:55:57 +0100 Subject: Add federation room list servlet --- synapse/config/server.py | 6 ++++ synapse/federation/transport/server.py | 65 +++++++++++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/synapse/config/server.py b/synapse/config/server.py index 0b5f462e44..8d554d749d 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -29,6 +29,7 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) self.public_baseurl = config.get("public_baseurl") + self.secondary_directory_servers = config.get("secondary_directory_servers", []) if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': @@ -156,6 +157,11 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # A list of other Home Servers to fetch the public room directory from + # and include in the public room directory of this home server + # secondary_directory_servers: + # - matrix.org + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5b6c7d11dd..b82f72fd57 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -134,10 +134,11 @@ class Authenticator(object): class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter, server_name): + def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter + self.room_list_handler = room_list_handler def _wrap(self, code): authenticator = self.authenticator @@ -491,6 +492,66 @@ class OpenIdUserInfo(BaseFederationServlet): def _wrap(self, code): return code +class PublicRoomList(BaseFederationServlet): + """ + Fetch the public room list for this server. + + This API returns information in the same format as /publicRooms on the + client API, but will only ever include local public rooms and hence is + intended for consumption by other home servers. + + GET /publicRooms HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "chunk": [ + { + "aliases": [ + "#test:localhost" + ], + "guest_can_join": false, + "name": "test room", + "num_joined_members": 3, + "room_id": "!whkydVegtvatLfXmPN:localhost", + "world_readable": false + } + ], + "end": "END", + "start": "START" + } + """ + + PATH = "/publicRooms" + + @defer.inlineCallbacks + def on_GET(self, request): + data = yield self.room_list_handler.get_public_room_list() + defer.returnValue((200, data)) + + token = parse_string(request, "access_token") + if token is None: + defer.returnValue((401, { + "errcode": "M_MISSING_TOKEN", "error": "Access Token required" + })) + return + + user_id = yield self.handler.on_openid_userinfo(token) + + if user_id is None: + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) + + defer.returnValue((200, {"sub": user_id})) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + SERVLET_CLASSES = ( FederationSendServlet, @@ -513,6 +574,7 @@ SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, OpenIdUserInfo, + PublicRoomList, ) @@ -523,4 +585,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter): authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, + room_list_handler=hs.get_room_list_handler(), ).register(resource) -- cgit 1.5.1 From 70ecb415f553e5de86833034ce184a8a905b7ed5 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 12:00:54 +0100 Subject: Fix c+p fail --- synapse/federation/transport/server.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b82f72fd57..f23c02efde 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -530,23 +530,6 @@ class PublicRoomList(BaseFederationServlet): data = yield self.room_list_handler.get_public_room_list() defer.returnValue((200, data)) - token = parse_string(request, "access_token") - if token is None: - defer.returnValue((401, { - "errcode": "M_MISSING_TOKEN", "error": "Access Token required" - })) - return - - user_id = yield self.handler.on_openid_userinfo(token) - - if user_id is None: - defer.returnValue((401, { - "errcode": "M_UNKNOWN_TOKEN", - "error": "Access Token unknown or expired" - })) - - defer.returnValue((200, {"sub": user_id})) - # Avoid doing remote HS authorization checks which are done by default by # BaseFederationServlet. def _wrap(self, code): -- cgit 1.5.1 From e5b0bbcd3381e581ecf9876760bbe36c66fcb4fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 May 2016 13:46:58 +0100 Subject: Add caches to bulk_get_push_rules* --- synapse/push/bulk_push_rule_evaluator.py | 8 +++++--- synapse/storage/push_rule.py | 16 ++++++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 25e13b3423..25f2fb9da4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) def decode_rule_json(rule): + rule = dict(rule) rule['conditions'] = json.loads(rule['conditions']) rule['actions'] = json.loads(rule['actions']) return rule @@ -39,6 +40,8 @@ def _get_rules(room_id, user_ids, store): rules_by_user = yield store.bulk_get_push_rules(user_ids) rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) + rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} + rules_by_user = { uid: list_with_base_rules([ decode_rule_json(rule_list) @@ -51,11 +54,10 @@ def _get_rules(room_id, user_ids, store): # fetch disabled rules, but this won't account for any server default # rules the user has disabled, so we need to do this too. for uid in user_ids: - if uid not in rules_enabled_by_user: + user_enabled_map = rules_enabled_by_user.get(uid) + if not user_enabled_map: continue - user_enabled_map = rules_enabled_by_user[uid] - for i, rule in enumerate(rules_by_user[uid]): rule_id = rule['rule_id'] diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index d2bf7f2aec..f285f59afd 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from twisted.internet import defer import logging @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) class PushRuleStore(SQLBaseStore): - @cachedInlineCallbacks() + @cachedInlineCallbacks(lru=True) def get_push_rules_for_user(self, user_id): rows = yield self._simple_select_list( table="push_rules", @@ -44,7 +44,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rows) - @cachedInlineCallbacks() + @cachedInlineCallbacks(lru=True) def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", @@ -60,7 +60,8 @@ class PushRuleStore(SQLBaseStore): r['rule_id']: False if r['enabled'] == 0 else True for r in results }) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules(self, user_ids): if not user_ids: defer.returnValue({}) @@ -75,13 +76,16 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules", ) - rows.sort(key=lambda e: (-e["priority_class"], -e["priority"])) + rows.sort( + key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) + ) for row in rows: results.setdefault(row['user_name'], []).append(row) defer.returnValue(results) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_enabled_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules_enabled(self, user_ids): if not user_ids: defer.returnValue({}) -- cgit 1.5.1 From 4efa389299bfef694af41b642ba9623a8be5df93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 May 2016 15:37:53 +0100 Subject: Fix GET /push_rules --- synapse/rest/client/v1/push_rule.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 02d837ee6a..c135353aab 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -132,6 +132,9 @@ class PushRuleRestServlet(ClientV1RestServlet): enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) + rawrules = {k: v for k, v in rawrules.item() if k is not None} + enabled_map = {k: v for k, v in enabled_map.item() if k is not None} + rules = format_push_rules_for_user(requester.user, rawrules, enabled_map) path = request.postpath[1:] -- cgit 1.5.1 From cca0093fa96cfec0566ff790ef990fad8b6763bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 May 2016 15:44:08 +0100 Subject: Change fix --- synapse/rest/client/v1/push_rule.py | 3 --- synapse/storage/push_rule.py | 10 ++++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index c135353aab..02d837ee6a 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -132,9 +132,6 @@ class PushRuleRestServlet(ClientV1RestServlet): enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - rawrules = {k: v for k, v in rawrules.item() if k is not None} - enabled_map = {k: v for k, v in enabled_map.item() if k is not None} - rules = format_push_rules_for_user(requester.user, rawrules, enabled_map) path = request.postpath[1:] diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index f285f59afd..65c20e8900 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -66,7 +66,10 @@ class PushRuleStore(SQLBaseStore): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: [] + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules", @@ -90,7 +93,10 @@ class PushRuleStore(SQLBaseStore): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: [] + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules_enable", -- cgit 1.5.1 From 1d4ee854e21626f64c945610e467f7e761534424 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 May 2016 15:45:53 +0100 Subject: Fix typo --- synapse/storage/push_rule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 65c20e8900..216a8bf69c 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -94,7 +94,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue({}) results = { - user_id: [] + user_id: {} for user_id in user_ids } -- cgit 1.5.1 From c8c5bf950a27e00e3e9ae57b98f38cab03cdc3c9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 31 May 2016 17:10:40 +0100 Subject: Fix synapse/storage/schema/delta/30/as_users.py --- synapse/storage/schema/delta/30/as_users.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index b417e3ac08..5b7d8d1ab5 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from synapse.storage.appservice import ApplicationServiceStore +from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): logger.warning("Could not get app_service_config_files from config") pass - appservices = ApplicationServiceStore.load_appservices( + appservices = load_appservices( config.server_name, config_files ) -- cgit 1.5.1 From d240796dedcfae1f6929c1501e7e335df417cfaf Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:20:07 +0100 Subject: Basic, un-cached support for secondary_directory_servers --- synapse/federation/federation_client.py | 21 +++++++++++++++++++++ synapse/federation/transport/client.py | 12 ++++++++++++ synapse/federation/transport/server.py | 2 +- synapse/handlers/room.py | 33 ++++++++++++++++++++++++++++++++- synapse/rest/client/v1/room.py | 3 ++- 5 files changed, 68 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 37ee469fa2..ba8d71c050 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,6 +24,7 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -550,6 +551,26 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") + @defer.inlineCallbacks + def get_public_rooms(self, destinations): + results_by_server = {} + + @defer.inlineCallbacks + def _get_result(s): + if s == self.server_name: + defer.returnValue() + + try: + result = yield self.transport_layer.get_public_rooms(s) + results_by_server[s] = result + except: + logger.exception("Error getting room list from server %r", s) + + + yield concurrently_execute(_get_result, destinations, 3) + + defer.returnValue(results_by_server) + @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): """ diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cd2841c4db..ebb698e278 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -224,6 +224,18 @@ class TransportLayerClient(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def get_public_rooms(self, remote_server): + path = PREFIX + "/publicRooms" + + response = yield self.client.get_json( + destination=remote_server, + path=path, + ) + + defer.returnValue(response) + @defer.inlineCallbacks @log_function def exchange_third_party_invite(self, destination, room_id, event_dict): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index f23c02efde..da9e7a326d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -527,7 +527,7 @@ class PublicRoomList(BaseFederationServlet): @defer.inlineCallbacks def on_GET(self, request): - data = yield self.room_list_handler.get_public_room_list() + data = yield self.room_list_handler.get_local_public_room_list() defer.returnValue((200, data)) # Avoid doing remote HS authorization checks which are done by default by diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3d63b3c513..b0aa9fb511 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,7 +345,7 @@ class RoomListHandler(BaseHandler): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache() - def get_public_room_list(self): + def get_local_public_room_list(self): result = self.response_cache.get(()) if not result: result = self.response_cache.set((), self._get_public_room_list()) @@ -427,6 +427,37 @@ class RoomListHandler(BaseHandler): # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) + @defer.inlineCallbacks + def get_aggregated_public_room_list(self): + """ + Get the public room list from this server and the servers + specified in the secondary_directory_servers config option. + XXX: Pagination... + """ + federated_by_server = yield self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + public_rooms = yield self.get_local_public_room_list() + + # keep track of which room IDs we've seen so we can de-dup + room_ids = set() + + # tag all the ones in our list with our server name. + # Also add the them to the de-deping set + for room in public_rooms['chunk']: + room["server_name"] = self.hs.hostname + room_ids.add(room["room_id"]) + + # Now add the results from federation + for server_name, server_result in federated_by_server.items(): + for room in server_result["chunk"]: + if room["room_id"] not in room_ids: + room["server_name"] = server_name + public_rooms["chunk"].append(room) + room_ids.add(room["room_id"]) + + defer.returnValue(public_rooms) + class RoomContextHandler(BaseHandler): @defer.inlineCallbacks diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2d22bbdaa3..db52a1fc39 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -280,7 +280,8 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): handler = self.hs.get_room_list_handler() - data = yield handler.get_public_room_list() + data = yield handler.get_aggregated_public_room_list() + defer.returnValue((200, data)) -- cgit 1.5.1 From 963e3ed2828f6d1e704678af971ceffff3076115 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:22:53 +0100 Subject: Apparently I am not permitted to have two blank lines here --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ba8d71c050..d835c1b038 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -566,7 +566,6 @@ class FederationClient(FederationBase): except: logger.exception("Error getting room list from server %r", s) - yield concurrently_execute(_get_result, destinations, 3) defer.returnValue(results_by_server) -- cgit 1.5.1 From 6ca4d3ae9add2cf2bab3cf074072d7a7bb0b6553 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:24:50 +0100 Subject: Add vector.im to default secondary_directory_servers and add comment explaining it's not a permanent solution --- synapse/config/server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/config/server.py b/synapse/config/server.py index 8d554d749d..c2d8f8a52f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -159,8 +159,12 @@ class ServerConfig(Config): # A list of other Home Servers to fetch the public room directory from # and include in the public room directory of this home server + # This is a temporary stopgap solution to populate new server with a + # list of rooms until there exists a good solution of a decentralized + # room directory. # secondary_directory_servers: # - matrix.org + # - vector.im # List of ports that Synapse should listen on, their purpose and their # configuration. -- cgit 1.5.1 From 2a449fec4d03b7739269e832683905a18459e4c3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 18:27:23 +0100 Subject: Add cache to remote room lists Poll for updates from remote servers, waiting for the poll if there's no cache entry. --- synapse/handlers/room.py | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b0aa9fb511..77063b021a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -36,6 +36,8 @@ import string logger = logging.getLogger(__name__) +REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 + id_server_scheme = "https://" @@ -344,6 +346,12 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache() + self.remote_list_request_cache = ResponseCache() + self.remote_list_cache = {} + self.fetch_looping_call = hs.get_clock().looping_call( + self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL + ) + self.fetch_all_remote_lists() def get_local_public_room_list(self): result = self.response_cache.get(()) @@ -427,6 +435,14 @@ class RoomListHandler(BaseHandler): # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) + @defer.inlineCallbacks + def fetch_all_remote_lists(self): + deferred = self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + self.remote_list_request_cache.set((), deferred) + yield deferred + @defer.inlineCallbacks def get_aggregated_public_room_list(self): """ @@ -434,9 +450,19 @@ class RoomListHandler(BaseHandler): specified in the secondary_directory_servers config option. XXX: Pagination... """ - federated_by_server = yield self.hs.get_replication_layer().get_public_rooms( - self.hs.config.secondary_directory_servers - ) + # We return the results from out cache which is updated by a looping call, + # unless we're missing a cache entry, in which case wait for the result + # of the fetch if there's one in progress. If not, omit that server. + wait = False + for s in self.hs.config.secondary_directory_servers: + if s not in self.remote_list_cache: + logger.warn("No cached room list from %s: waiting for fetch", s) + wait = True + break + + if wait and self.remote_list_request_cache.get(()): + yield self.remote_list_request_cache.get(()) + public_rooms = yield self.get_local_public_room_list() # keep track of which room IDs we've seen so we can de-dup @@ -449,7 +475,7 @@ class RoomListHandler(BaseHandler): room_ids.add(room["room_id"]) # Now add the results from federation - for server_name, server_result in federated_by_server.items(): + for server_name, server_result in self.remote_list_cache.items(): for room in server_result["chunk"]: if room["room_id"] not in room_ids: room["server_name"] = server_name -- cgit 1.5.1 From 58ee43d020804448fe2ae504072e3c2addae6eb1 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 31 May 2016 20:28:42 +0100 Subject: handle emotes & notices correctly in email notifs --- res/templates/notif.html | 6 +++++- res/templates/notif.txt | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/res/templates/notif.html b/res/templates/notif.html index 834840861e..88b921ca9c 100644 --- a/res/templates/notif.html +++ b/res/templates/notif.html @@ -17,11 +17,15 @@ {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %} -
{{ message.sender_name }}
+
{% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}
{% endif %}
{% if message.msgtype == "m.text" %} {{ message.body_text_html }} + {% elif message.msgtype == "m.emote" %} + {{ message.body_text_html }} + {% elif message.msgtype == "m.notice" %} + {{ message.body_text_html }} {% elif message.msgtype == "m.image" %} {% elif message.msgtype == "m.file" %} diff --git a/res/templates/notif.txt b/res/templates/notif.txt index a3ddac80ce..a37bee9833 100644 --- a/res/templates/notif.txt +++ b/res/templates/notif.txt @@ -1,7 +1,11 @@ {% for message in notif.messages %} -{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }}) +{% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }}) {% if message.msgtype == "m.text" %} {{ message.body_text_plain }} +{% elif message.msgtype == "m.emote" %} +{{ message.body_text_plain }} +{% elif message.msgtype == "m.notice" %} +{{ message.body_text_plain }} {% elif message.msgtype == "m.image" %} {{ message.body_text_plain }} {% elif message.msgtype == "m.file" %} -- cgit 1.5.1 From 6ecb2ca4ec3fae8c6f2e837b4ec99cc6929de638 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 1 Jun 2016 09:48:55 +0100 Subject: pep8 --- synapse/federation/transport/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index da9e7a326d..a1a334955f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -134,7 +134,8 @@ class Authenticator(object): class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): + def __init__(self, handler, authenticator, ratelimiter, server_name, + room_list_handler): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter @@ -492,6 +493,7 @@ class OpenIdUserInfo(BaseFederationServlet): def _wrap(self, code): return code + class PublicRoomList(BaseFederationServlet): """ Fetch the public room list for this server. -- cgit 1.5.1 From 195254cae80f4748c3fc0ac3b46000047c2e6cc0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 1 Jun 2016 11:14:16 +0100 Subject: Inject fake room list handler in tests Otherwise it tries to start the remote public room list updating looping call which breaks. --- tests/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/utils.py b/tests/utils.py index 59d985b5f2..006abedbc1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -67,6 +67,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): version_string="Synapse/tests", database_engine=create_engine(config.database_config), get_db_conn=db_pool.get_db_conn, + room_list_handler=object(), **kargs ) hs.setup() @@ -75,6 +76,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): name, db_pool=None, datastore=datastore, config=config, version_string="Synapse/tests", database_engine=create_engine(config.database_config), + room_list_handler=object(), **kargs ) -- cgit 1.5.1 From d60eed07109f61ebe2120e1eb566e5bb7095fbad Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 1 Jun 2016 11:45:43 +0100 Subject: Limit number of notifications in an email notification --- synapse/storage/event_push_actions.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4dae51a172..940e11d7a2 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -119,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None): + max_stream_ordering=None, + limit=20): def get_after_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " @@ -151,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore): if max_stream_ordering is not None: sql += " AND ep.stream_ordering <= ?" args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering ASC" + sql += " ORDER BY ep.stream_ordering ASC LIMIT ?" + args.append(limit) txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( -- cgit 1.5.1 From b06b10c8e3b3ad890886dea7b948eae03505d71a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 1 Jun 2016 15:57:46 +0100 Subject: Add infrastructure to the presence handler to track sync requests in external processes --- synapse/handlers/presence.py | 76 +++++++++++++++++++++++++++++++++++------ tests/handlers/test_presence.py | 16 ++++----- 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 37f57301fb..047da7ff25 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -158,10 +158,21 @@ class PresenceHandler(object): self.serial_to_user = {} self._next_serial = 1 - # Keeps track of the number of *ongoing* syncs. While this is non zero - # a user will never go offline. + # Keeps track of the number of *ongoing* syncs on this process. While + # this is non zero a user will never go offline. self.user_to_num_current_syncs = {} + # Keeps track of the number of *ongoing* syncs on other processes. + # While any sync is ongoing on another process the user will never + # go offline. + # Each process has a unique identifier and an update frequency. If + # no update is received from that process within the update period then + # we assume that all the sync requests on that process have stopped. + # Stored as a dict from process_id to set of user_id, and a dict of + # process_id to millisecond timestamp last updated. + self.external_process_to_current_syncs = {} + self.external_process_last_updated_ms = [] + # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. @@ -286,7 +297,7 @@ class PresenceHandler(object): changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - user_to_num_current_syncs=self.user_to_num_current_syncs, + syncing_users=self.get_syncing_users(), now=now, ) @@ -363,6 +374,51 @@ class PresenceHandler(object): defer.returnValue(_user_syncing()) + def get_currently_syncing_users(self): + syncing_user_ids = { + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count + } + syncing_user_ids.update(self.external_process_to_current_syncs.values()) + return syncing_user_ids + + @defer.inlineCallbacks + def update_external_syncs(self, process_id, syncing_user_ids): + time_now_ms = self.clock.time_msec() + prev_syncing_user_ids = ( + self.external_process_to_current_syncs.get(process_id, set()) + ) + prev_states = yield self.current_state_for_users( + syncing_user_ids + prev_syncing_user_ids + ) + updates = [] + + for new_user_id in syncing_user_ids - prev_syncing_user_ids: + prev_state = prev_states[new_user_id] + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + for old_user_id in prev_syncing_user_ids: + prev_state = prev_states[old_user_id] + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + yield self._update_states(updates) + + self.external_process_last_updated_ms = time_now_ms + self.external_process_to_current_syncs[process_id] = syncing_user_ids + if not syncing_user_ids: + del self.external_process_to_current_syncs[process_id] + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. @@ -935,15 +991,14 @@ class PresenceEventSource(object): return self.get_new_events(user, from_key=None, include_offline=False) -def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): +def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): """Checks the presence of users that have timed out and updates as appropriate. Args: user_states(list): List of UserPresenceState's to check. is_mine_fn (fn): Function that returns if a user_id is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -954,21 +1009,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): for state in user_states: is_mine = is_mine_fn(state.user_id) - new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) + new_state = handle_timeout(state, is_mine, syncing_user_ids, now) if new_state: changes[state.user_id] = new_state return changes.values() -def handle_timeout(state, is_mine, user_to_num_current_syncs, now): +def handle_timeout(state, is_mine, syncing_user_ids, now): """Checks the presence of the user to see if any of the timers have elapsed Args: state (UserPresenceState) is_mine (bool): Whether the user is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -1002,7 +1056,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now): # If there are have been no sync for a while (and none ongoing), # set presence to offline - if not user_to_num_current_syncs.get(user_id, 0): + if user_id not in syncing_user_ids: if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: state = state.copy_and_replace( state=PresenceState.OFFLINE, diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 87c795fcfa..b531ba8540 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={ - user_id: 1, - }, now=now + state, is_mine=True, syncing_user_ids=set([user_id]), now=now ) self.assertIsNotNone(new_state) @@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNone(new_state) @@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=False, user_to_num_current_syncs={}, now=now + state, is_mine=False, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) -- cgit 1.5.1 From 5b6373c895cd5232292a63bbb83c71f272c18650 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 1 Jun 2016 17:15:35 +0100 Subject: Expire stale entries for dead external processes --- synapse/handlers/presence.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 047da7ff25..7ac31cf233 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -68,6 +68,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000 # How often to resend presence to remote servers FEDERATION_PING_INTERVAL = 25 * 60 * 1000 +# How long we will wait before assuming that the syncs from an external process +# are dead. +EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 + assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER @@ -283,13 +287,26 @@ class PresenceHandler(object): # Fetch the list of users that *may* have timed out. Things may have # changed since the timeout was set, so we won't necessarily have to # take any action. - users_to_check = self.wheel_timer.fetch(now) + users_to_check = set(self.wheel_timer.fetch(now)) + + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_update.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_to_current_syncs.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) states = [ self.user_to_current_state.get( user_id, UserPresenceState.default(user_id) ) - for user_id in set(users_to_check) + for user_id in users_to_check ] timers_fired_counter.inc_by(len(states)) -- cgit 1.5.1 From 06476ed4a9611c60f4cab5f4b9012301b32b9365 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 11:26:49 +0100 Subject: Add an http endpoint for making users as syncing Add some docstrings and comments. --- synapse/handlers/presence.py | 30 ++++++++++++++--- synapse/replication/presence_resource.py | 58 ++++++++++++++++++++++++++++++++ synapse/replication/resource.py | 2 ++ 3 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 synapse/replication/presence_resource.py diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7ac31cf233..21e3028d37 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -392,6 +392,10 @@ class PresenceHandler(object): defer.returnValue(_user_syncing()) def get_currently_syncing_users(self): + """Get the set of user ids that are currently syncing on this HS. + Returns: + set(str): A set of user_id strings. + """ syncing_user_ids = { user_id for user_id, count in self.user_to_num_current_syncs.items() if count @@ -401,15 +405,30 @@ class PresenceHandler(object): @defer.inlineCallbacks def update_external_syncs(self, process_id, syncing_user_ids): - time_now_ms = self.clock.time_msec() + """Update the syncing users for an external process + + Args: + process_id(str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + syncing_user_ids(set(str)): The set of user_ids that are + currently syncing on that server. + """ + + # Grab the previous list of user_ids that were syncing on that process prev_syncing_user_ids = ( self.external_process_to_current_syncs.get(process_id, set()) ) + # Grab the current presence state for both the users that are syncing + # now and the users that were syncing before this update. prev_states = yield self.current_state_for_users( syncing_user_ids + prev_syncing_user_ids ) updates = [] + time_now_ms = self.clock.time_msec() + # For each new user that is syncing check if we need to mark them as + # being online. for new_user_id in syncing_user_ids - prev_syncing_user_ids: prev_state = prev_states[new_user_id] if prev_state.state == PresenceState.OFFLINE: @@ -423,6 +442,9 @@ class PresenceHandler(object): last_user_sync_ts=time_now_ms, )) + # For each user that is still syncing or stopped syncing update the + # last sync time so that we will correctly apply the grace period when + # they stop syncing. for old_user_id in prev_syncing_user_ids: prev_state = prev_states[old_user_id] updates.append(prev_state.copy_and_replace( @@ -431,10 +453,10 @@ class PresenceHandler(object): yield self._update_states(updates) - self.external_process_last_updated_ms = time_now_ms + # Update the last updated time for the process. We expire the entries + # if we don't receive an update in the given timeframe. + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() self.external_process_to_current_syncs[process_id] = syncing_user_ids - if not syncing_user_ids: - del self.external_process_to_current_syncs[process_id] @defer.inlineCallbacks def current_state_for_user(self, user_id): diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py new file mode 100644 index 0000000000..5a1b1a8dae --- /dev/null +++ b/synapse/replication/presence_resource.py @@ -0,0 +1,58 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PresenceResource(Resource): + """ + HTTP endpoint for marking users as syncing. + + POST /_synapse/replication/presence HTTP/1.1 + Content-Type: application/json + + { + "process_id": "", + "syncing_users": [""] + } + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.presence_handler = hs.get_presence_handler() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + process_id = content["process_id"] + syncing_user_ids = content["syncing_users"] + + yield self.presence_handler.update_external_syncs( + process_id, set(syncing_user_ids) + ) + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 847f212a3d..8c2d487ff4 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -16,6 +16,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource +from synapse.replication.presence_resource import PresenceResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -115,6 +116,7 @@ class ReplicationResource(Resource): self.clock = hs.get_clock() self.putChild("remove_pushers", PusherResource(hs)) + self.putChild("syncing_users", PresenceResource(hs)) def render_GET(self, request): self._async_render_GET(request) -- cgit 1.5.1