From 4e1cebd56f9688d49a51929264c095356005f9a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Aug 2016 15:31:44 +0100 Subject: Make synchrotron accept /events --- synapse/handlers/__init__.py | 3 --- synapse/handlers/presence.py | 27 +++++++++++++++++++-------- 2 files changed, 19 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 1a50a2ec98..63d05f2531 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -19,7 +19,6 @@ from .room import ( ) from .room_member import RoomMemberHandler from .message import MessageHandler -from .events import EventStreamHandler, EventHandler from .federation import FederationHandler from .profile import ProfileHandler from .directory import DirectoryHandler @@ -53,8 +52,6 @@ class Handlers(object): self.message_handler = MessageHandler(hs) self.room_creation_handler = RoomCreationHandler(hs) self.room_member_handler = RoomMemberHandler(hs) - self.event_stream_handler = EventStreamHandler(hs) - self.event_handler = EventHandler(hs) self.federation_handler = FederationHandler(hs) self.profile_handler = ProfileHandler(hs) self.directory_handler = DirectoryHandler(hs) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2293b5fdf7..6a1fe76c88 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -503,7 +503,7 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states): + def _get_interested_parties(self, states, calculate_remote_hosts=True): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. @@ -526,14 +526,15 @@ class PresenceHandler(object): users_to_states.setdefault(state.user_id, []).append(state) hosts_to_states = {} - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue + if calculate_remote_hosts: + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue - hosts = yield self.store.get_joined_hosts_for_room(room_id) - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) + hosts = yield self.store.get_joined_hosts_for_room(room_id) + for host in hosts: + hosts_to_states.setdefault(host, []).extend(local_states) for user_id, states in users_to_states.items(): local_states = filter(lambda s: self.is_mine_id(s.user_id), states) @@ -565,6 +566,16 @@ class PresenceHandler(object): self._push_to_remotes(hosts_to_states) + @defer.inlineCallbacks + def notify_for_states(self, state, stream_id): + parties = yield self._get_interested_parties([state]) + room_ids_to_states, users_to_states, hosts_to_states = parties + + self.notifier.on_new_event( + "presence_key", stream_id, rooms=room_ids_to_states.keys(), + users=[UserID.from_string(u) for u in users_to_states.keys()] + ) + def _push_to_remotes(self, hosts_to_states): """Sends state updates to remote servers. -- cgit 1.4.1 From dc3a00f24f301ab08750fdd8ca6ae040ba290e1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 17:04:39 +0100 Subject: Refactor user_delete_access_tokens. Invalidate get_user_by_access_token to slaves. --- synapse/handlers/auth.py | 6 ++-- synapse/push/pusherpool.py | 8 ++--- synapse/storage/registration.py | 70 +++++++++++++++++++---------------------- 3 files changed, 39 insertions(+), 45 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a582d6334b..6986930c0d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -741,7 +741,7 @@ class AuthHandler(BaseHandler): def set_password(self, user_id, newpassword, requester=None): password_hash = self.hash(newpassword) - except_access_token_ids = [requester.access_token_id] if requester else [] + except_access_token_id = requester.access_token_id if requester else None try: yield self.store.user_set_password_hash(user_id, password_hash) @@ -750,10 +750,10 @@ class AuthHandler(BaseHandler): raise SynapseError(404, "Unknown user", Codes.NOT_FOUND) raise e yield self.store.user_delete_access_tokens( - user_id, except_access_token_ids + user_id, except_access_token_id ) yield self.hs.get_pusherpool().remove_pushers_by_user( - user_id, except_access_token_ids + user_id, except_access_token_id ) @defer.inlineCallbacks diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 5853ec36a9..54c0f1b849 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -102,14 +102,14 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user(self, user_id, except_token_ids=[]): + def remove_pushers_by_user(self, user_id, except_access_token_id=None): all = yield self.store.get_all_pushers() logger.info( - "Removing all pushers for user %s except access tokens ids %r", - user_id, except_token_ids + "Removing all pushers for user %s except access tokens id %r", + user_id, except_access_token_id ) for p in all: - if p['user_name'] == user_id and p['access_token'] not in except_token_ids: + if p['user_name'] == user_id and p['access_token'] != except_access_token_id: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7e7d32eb66..19cb3b31c6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -251,7 +251,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[], + def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None, delete_refresh_tokens=False): """ @@ -259,7 +259,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: user_id (str): ID of user the tokens belong to - except_token_ids (list[str]): list of access_tokens which should + except_token_id (str): list of access_tokens IDs which should *not* be deleted device_id (str|None): ID of device the tokens are associated with. If None, tokens associated with any device (or no device) will @@ -269,53 +269,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Returns: defer.Deferred: """ - def f(txn, table, except_tokens, call_after_delete): - sql = "SELECT token FROM %s WHERE user_id = ?" % table - clauses = [user_id] - + def f(txn): + keyvalues = { + "user_id": user_id, + } if device_id is not None: - sql += " AND device_id = ?" - clauses.append(device_id) + keyvalues["device_id"] = device_id - if except_tokens: - sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_tokens]), + if delete_refresh_tokens: + self._simple_delete_txn( + txn, + table="refresh_tokens", + keyvalues=keyvalues, ) - clauses += except_tokens - - txn.execute(sql, clauses) - rows = txn.fetchall() + items = keyvalues.items() + where_clause = " AND ".join(k + " = ?" for k, _ in items) + values = [v for _, v in items] + if except_token_id: + where_clause += " AND id != ?" + values.append(except_token_id) - n = 100 - chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] - for chunk in chunks: - if call_after_delete: - for row in chunk: - txn.call_after(call_after_delete, (row[0],)) + txn.execute( + "SELECT token FROM access_tokens WHERE %s" % where_clause, + values + ) + rows = self.cursor_to_dict(txn) - txn.execute( - "DELETE FROM %s WHERE token in (%s)" % ( - table, - ",".join(["?" for _ in chunk]), - ), [r[0] for r in chunk] + for row in rows: + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (row["token"],) ) - # delete refresh tokens first, to stop new access tokens being - # allocated while our backs are turned - if delete_refresh_tokens: - yield self.runInteraction( - "user_delete_access_tokens", f, - table="refresh_tokens", - except_tokens=[], - call_after_delete=None, + txn.execute( + "DELETE FROM access_tokens WHERE %s" % where_clause, + values ) yield self.runInteraction( "user_delete_access_tokens", f, - table="access_tokens", - except_tokens=except_token_ids, - call_after_delete=self.get_user_by_access_token.invalidate, ) def delete_access_token(self, access_token): @@ -328,7 +320,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): }, ) - txn.call_after(self.get_user_by_access_token.invalidate, (access_token,)) + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (access_token,) + ) return self.runInteraction("delete_access_token", f) -- cgit 1.4.1 From 2ee1bd124c2d9ac7f11d0b32c9e40a842db14fe9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Aug 2016 11:34:36 +0100 Subject: Limit number of extremeties in backfill request This works around a bug where if we make a backfill request with too many extremeties it causes the request URI to be too long. --- synapse/handlers/federation.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ff6bb475b5..328f8f4842 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -274,7 +274,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def backfill(self, dest, room_id, limit, extremities=[]): + def backfill(self, dest, room_id, limit, extremities): """ Trigger a backfill request to `dest` for the given `room_id` This will attempt to get more events from the remote. This may return @@ -284,9 +284,6 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") - if not extremities: - extremities = yield self.store.get_oldest_events_in_room(room_id) - events = yield self.replication_layer.backfill( dest, room_id, @@ -455,6 +452,10 @@ class FederationHandler(BaseHandler): ) max_depth = sorted_extremeties_tuple[0][1] + # We don't want to specify too many extremities as it causes the backfill + # request URI to be too long. + extremities = dict(sorted_extremeties_tuple[:5]) + if current_depth > max_depth: logger.debug( "Not backfilling as we don't need to. %d < %d", -- cgit 1.4.1 From 62c5245c8753de306b728f2917f03e786982188f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Aug 2016 11:12:29 +0100 Subject: Measure notify_interested_services --- synapse/handlers/appservice.py | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 051ccdb380..48feae07b5 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService +from synapse.util.metrics import Measure import logging @@ -42,6 +43,7 @@ class ApplicationServicesHandler(object): self.appservice_api = hs.get_application_service_api() self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False + self.clock = hs.get_clock() @defer.inlineCallbacks def notify_interested_services(self, event): @@ -53,25 +55,26 @@ class ApplicationServicesHandler(object): Args: event(Event): The event to push out to interested services. """ - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - return # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as these - # user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - self.scheduler.submit_event_for_as(service, event) + with Measure(self.clock, "notify_interested_services"): + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + return # no services need notifying + + # Do we know this user exists? If not, poke the user query API for + # all services which match that user regex. This needs to block as these + # user queries need to be made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): -- cgit 1.4.1 From 320dfe523c9496822f597f870dfae9f8ec77d54d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Aug 2016 17:20:50 +0100 Subject: Make notify_interested_services faster --- synapse/appservice/__init__.py | 81 ++++++++++++++------------- synapse/handlers/appservice.py | 31 +++------- tests/appservice/test_appservice.py | 109 +++++++++++++++--------------------- tests/handlers/test_appservice.py | 13 ++++- 4 files changed, 104 insertions(+), 130 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index f7178ea0d3..b1b91d0a55 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -14,6 +14,8 @@ # limitations under the License. from synapse.api.constants import EventTypes +from twisted.internet import defer + import logging import re @@ -138,65 +140,66 @@ class ApplicationService(object): return regex_obj["exclusive"] return False - def _matches_user(self, event, member_list): - if (hasattr(event, "sender") and - self.is_interested_in_user(event.sender)): - return True + @defer.inlineCallbacks + def _matches_user(self, event, store): + if not event: + defer.returnValue(False) + + if self.is_interested_in_user(event.sender): + defer.returnValue(True) # also check m.room.member state key - if (hasattr(event, "type") and event.type == EventTypes.Member - and hasattr(event, "state_key") - and self.is_interested_in_user(event.state_key)): - return True + if (event.type == EventTypes.Member and + self.is_interested_in_user(event.state_key)): + defer.returnValue(True) + + if not store: + defer.returnValue(False) + + member_list = yield store.get_users_in_room(event.room_id) + # check joined member events for user_id in member_list: if self.is_interested_in_user(user_id): - return True - return False + defer.returnValue(True) + defer.returnValue(False) def _matches_room_id(self, event): if hasattr(event, "room_id"): return self.is_interested_in_room(event.room_id) return False - def _matches_aliases(self, event, alias_list): + @defer.inlineCallbacks + def _matches_aliases(self, event, store): + if not store or not event: + defer.returnValue(False) + + alias_list = yield store.get_aliases_for_room(event.room_id) for alias in alias_list: if self.is_interested_in_alias(alias): - return True - return False + defer.returnValue(True) + defer.returnValue(False) - def is_interested(self, event, restrict_to=None, aliases_for_event=None, - member_list=None): + @defer.inlineCallbacks + def is_interested(self, event, store=None): """Check if this service is interested in this event. Args: event(Event): The event to check. - restrict_to(str): The namespace to restrict regex tests to. - aliases_for_event(list): A list of all the known room aliases for - this event. - member_list(list): A list of all joined user_ids in this room. + store(DataStore) Returns: bool: True if this service would like to know about this event. """ - if aliases_for_event is None: - aliases_for_event = [] - if member_list is None: - member_list = [] - - if restrict_to and restrict_to not in ApplicationService.NS_LIST: - # this is a programming error, so fail early and raise a general - # exception - raise Exception("Unexpected restrict_to value: %s". restrict_to) - - if not restrict_to: - return (self._matches_user(event, member_list) - or self._matches_aliases(event, aliases_for_event) - or self._matches_room_id(event)) - elif restrict_to == ApplicationService.NS_ALIASES: - return self._matches_aliases(event, aliases_for_event) - elif restrict_to == ApplicationService.NS_ROOMS: - return self._matches_room_id(event) - elif restrict_to == ApplicationService.NS_USERS: - return self._matches_user(event, member_list) + # Do cheap checks first + if self._matches_room_id(event): + defer.returnValue(True) + + if (yield self._matches_aliases(event, store)): + defer.returnValue(True) + + if (yield self._matches_user(event, store)): + defer.returnValue(True) + + defer.returnValue(False) def is_interested_in_user(self, user_id): return ( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 48feae07b5..79805cdc2e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -16,7 +16,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.appservice import ApplicationService from synapse.util.metrics import Measure import logging @@ -107,11 +106,12 @@ class ApplicationServicesHandler(object): association can be found. """ room_alias_str = room_alias.to_string() - alias_query_services = yield self._get_services_for_event( - event=None, - restrict_to=ApplicationService.NS_ALIASES, - alias_list=[room_alias_str] - ) + services = yield self.store.get_app_services() + alias_query_services = [ + s for s in services if ( + s.is_interested_in_alias(room_alias_str) + ) + ] for alias_service in alias_query_services: is_known_alias = yield self.appservice_api.query_alias( alias_service, room_alias_str @@ -124,34 +124,19 @@ class ApplicationServicesHandler(object): defer.returnValue(result) @defer.inlineCallbacks - def _get_services_for_event(self, event, restrict_to="", alias_list=None): + def _get_services_for_event(self, event): """Retrieve a list of application services interested in this event. Args: event(Event): The event to check. Can be None if alias_list is not. - restrict_to(str): The namespace to restrict regex tests to. - alias_list: A list of aliases to get services for. If None, this - list is obtained from the database. Returns: list: A list of services interested in this event based on the service regex. """ - member_list = None - if hasattr(event, "room_id"): - # We need to know the aliases associated with this event.room_id, - # if any. - if not alias_list: - alias_list = yield self.store.get_aliases_for_room( - event.room_id - ) - # We need to know the members associated with this event.room_id, - # if any. - member_list = yield self.store.get_users_in_room(event.room_id) - services = yield self.store.get_app_services() interested_list = [ s for s in services if ( - s.is_interested(event, restrict_to, alias_list, member_list) + yield s.is_interested(event, self.store) ) ] defer.returnValue(interested_list) diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index d6cc1881e9..aa8cc50550 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -14,6 +14,8 @@ # limitations under the License. from synapse.appservice import ApplicationService +from twisted.internet import defer + from mock import Mock from tests import unittest @@ -42,20 +44,25 @@ class ApplicationServiceTestCase(unittest.TestCase): type="m.something", room_id="!foo:bar", sender="@someone:somewhere" ) + self.store = Mock() + + @defer.inlineCallbacks def test_regex_user_id_prefix_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") ) self.event.sender = "@irc_foobar:matrix.org" - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_user_id_prefix_no_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") ) self.event.sender = "@someone_else:matrix.org" - self.assertFalse(self.service.is_interested(self.event)) + self.assertFalse((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_room_member_is_checked(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") @@ -63,30 +70,36 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.sender = "@someone_else:matrix.org" self.event.type = "m.room.member" self.event.state_key = "@irc_foobar:matrix.org" - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_room_id_match(self): self.service.namespaces[ApplicationService.NS_ROOMS].append( _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_room_id_no_match(self): self.service.namespaces[ApplicationService.NS_ROOMS].append( _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" - self.assertFalse(self.service.is_interested(self.event)) + self.assertFalse((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_alias_match(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.assertTrue(self.service.is_interested( - self.event, - aliases_for_event=["#irc_foobar:matrix.org", "#athing:matrix.org"] - )) + self.store.get_aliases_for_room.return_value = [ + "#irc_foobar:matrix.org", "#athing:matrix.org" + ] + self.store.get_users_in_room.return_value = [] + self.assertTrue((yield self.service.is_interested( + self.event, self.store + ))) def test_non_exclusive_alias(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( @@ -136,15 +149,20 @@ class ApplicationServiceTestCase(unittest.TestCase): "!irc_foobar:matrix.org" )) + @defer.inlineCallbacks def test_regex_alias_no_match(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.assertFalse(self.service.is_interested( - self.event, - aliases_for_event=["#xmpp_foobar:matrix.org", "#athing:matrix.org"] - )) + self.store.get_aliases_for_room.return_value = [ + "#xmpp_foobar:matrix.org", "#athing:matrix.org" + ] + self.store.get_users_in_room.return_value = [] + self.assertFalse((yield self.service.is_interested( + self.event, self.store + ))) + @defer.inlineCallbacks def test_regex_multiple_matches(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") @@ -153,53 +171,13 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("@irc_.*") ) self.event.sender = "@irc_foobar:matrix.org" - self.assertTrue(self.service.is_interested( - self.event, - aliases_for_event=["#irc_barfoo:matrix.org"] - )) - - def test_restrict_to_rooms(self): - self.service.namespaces[ApplicationService.NS_ROOMS].append( - _regex("!flibble_.*:matrix.org") - ) - self.service.namespaces[ApplicationService.NS_USERS].append( - _regex("@irc_.*") - ) - self.event.sender = "@irc_foobar:matrix.org" - self.event.room_id = "!wibblewoo:matrix.org" - self.assertFalse(self.service.is_interested( - self.event, - restrict_to=ApplicationService.NS_ROOMS - )) - - def test_restrict_to_aliases(self): - self.service.namespaces[ApplicationService.NS_ALIASES].append( - _regex("#xmpp_.*:matrix.org") - ) - self.service.namespaces[ApplicationService.NS_USERS].append( - _regex("@irc_.*") - ) - self.event.sender = "@irc_foobar:matrix.org" - self.assertFalse(self.service.is_interested( - self.event, - restrict_to=ApplicationService.NS_ALIASES, - aliases_for_event=["#irc_barfoo:matrix.org"] - )) - - def test_restrict_to_senders(self): - self.service.namespaces[ApplicationService.NS_ALIASES].append( - _regex("#xmpp_.*:matrix.org") - ) - self.service.namespaces[ApplicationService.NS_USERS].append( - _regex("@irc_.*") - ) - self.event.sender = "@xmpp_foobar:matrix.org" - self.assertFalse(self.service.is_interested( - self.event, - restrict_to=ApplicationService.NS_USERS, - aliases_for_event=["#xmpp_barfoo:matrix.org"] - )) + self.store.get_aliases_for_room.return_value = ["#irc_barfoo:matrix.org"] + self.store.get_users_in_room.return_value = [] + self.assertTrue((yield self.service.is_interested( + self.event, self.store + ))) + @defer.inlineCallbacks def test_interested_in_self(self): # make sure invites get through self.service.sender = "@appservice:name" @@ -211,20 +189,21 @@ class ApplicationServiceTestCase(unittest.TestCase): "membership": "invite" } self.event.state_key = self.service.sender - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_member_list_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") ) - join_list = [ + self.store.get_users_in_room.return_value = [ "@alice:here", "@irc_fo:here", # AS user "@bob:here", ] + self.store.get_aliases_for_room.return_value = [] self.event.sender = "@xmpp_foobar:matrix.org" - self.assertTrue(self.service.is_interested( - event=self.event, - member_list=join_list - )) + self.assertTrue((yield self.service.is_interested( + event=self.event, store=self.store + ))) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 3116951472..9c1e5cc67c 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -110,11 +110,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): room_id = "!alpha:bet" servers = ["aperture"] - interested_service = self._mkservice(is_interested=True) + interested_service = self._mkservice_alias(is_interested_in_alias=True) services = [ - self._mkservice(is_interested=False), + self._mkservice_alias(is_interested_in_alias=False), interested_service, - self._mkservice(is_interested=False) + self._mkservice_alias(is_interested_in_alias=False) ] self.mock_store.get_app_services = Mock(return_value=services) @@ -137,3 +137,10 @@ class AppServiceHandlerTestCase(unittest.TestCase): service.token = "mock_service_token" service.url = "mock_service_url" return service + + def _mkservice_alias(self, is_interested_in_alias): + service = Mock() + service.is_interested_in_alias = Mock(return_value=is_interested_in_alias) + service.token = "mock_service_token" + service.url = "mock_service_url" + return service -- cgit 1.4.1 From 9da84a9a1ef0b88d2e6170d706425ab36431abda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Aug 2016 11:54:41 +0100 Subject: Make AppserviceHandler stream events from database This is for two reasons: 1. Suppresses duplicates correctly, as the notifier doesn't do any duplicate suppression. 2. Makes it easier to connect the AppserviceHandler to the replication stream. --- synapse/handlers/appservice.py | 65 +++++++++++++++------- synapse/notifier.py | 2 +- synapse/storage/appservice.py | 39 +++++++++++++ .../storage/schema/delta/34/appservice_stream.sql | 23 ++++++++ tests/handlers/test_appservice.py | 9 ++- 5 files changed, 113 insertions(+), 25 deletions(-) create mode 100644 synapse/storage/schema/delta/34/appservice_stream.sql (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 79805cdc2e..84341b0d20 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure +from synapse.util.logcontext import preserve_fn import logging @@ -45,35 +46,57 @@ class ApplicationServicesHandler(object): self.clock = hs.get_clock() @defer.inlineCallbacks - def notify_interested_services(self, event): + def notify_interested_services(self, current_id): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. Args: - event(Event): The event to push out to interested services. + current_id(int): The current maximum ID. """ + services = yield self.store.get_app_services() + if not services: + return + with Measure(self.clock, "notify_interested_services"): - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - return # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as these - # user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - self.scheduler.submit_event_for_as(service, event) + upper_bound = current_id + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user query API for + # all services which match that user regex. This needs to block as + # these user queries need to be made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)(service, event) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/notifier.py b/synapse/notifier.py index e4a25f2411..40a148994f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -214,7 +214,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - self.appservice_handler.notify_interested_services(event) + self.appservice_handler.notify_interested_services(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d1ee533fac..f0c88e05cd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return 0 else: return int(last_txn_id[0]) # select 'last_txn' col + + def set_appservice_last_pos(self, pos): + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) + ) + return self.runInteraction( + "set_appservice_last_pos", set_appservice_last_pos_txn + ) + + @defer.inlineCallbacks + def get_new_events_for_appservice(self, current_id, limit): + """Get all new evnets""" + + def get_new_events_for_appservice_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e, appservice_stream_position AS a" + " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_new_events_for_appservice", get_new_events_for_appservice_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql new file mode 100644 index 0000000000..69e16eda0f --- /dev/null +++ b/synapse/storage/schema/delta/34/appservice_stream.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS appservice_stream_position( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_ordering BIGINT, + CHECK (Lock='X') +); + +INSERT INTO appservice_stream_position (stream_ordering) + SELECT COALESCE(MAX(stream_ordering), 0) FROM events; diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 9c1e5cc67c..7fe88172c0 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -53,8 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): type="m.room.message", room_id="!foo:bar" ) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) self.mock_as_api.push = Mock() - yield self.handler.notify_interested_services(event) + yield self.handler.notify_interested_services(0) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -74,7 +75,8 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() self.mock_as_api.query_user = Mock() - yield self.handler.notify_interested_services(event) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) + yield self.handler.notify_interested_services(0) self.mock_as_api.query_user.assert_called_once_with( services[0], user_id ) @@ -96,7 +98,8 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() self.mock_as_api.query_user = Mock() - yield self.handler.notify_interested_services(event) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) + yield self.handler.notify_interested_services(0) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been." -- cgit 1.4.1