From b9abf3e4e3c764d8a8f537a092b5802222cde012 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Aug 2016 11:48:23 +0100 Subject: Remove dead appservice code --- synapse/notifier.py | 41 ++--------------------------------------- 1 file changed, 2 insertions(+), 39 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 30883a0696..e4a25f2411 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -67,10 +67,8 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user_id, rooms, current_token, time_now_ms, - appservice=None): + def __init__(self, user_id, rooms, current_token, time_now_ms): self.user_id = user_id - self.appservice = appservice self.rooms = set(rooms) self.current_token = current_token self.last_notified_ms = time_now_ms @@ -107,11 +105,6 @@ class _NotifierUserStream(object): notifier.user_to_user_stream.pop(self.user_id) - if self.appservice: - notifier.appservice_to_user_streams.get( - self.appservice, set() - ).discard(self) - def count_listeners(self): return len(self.notify_deferred.observers()) @@ -142,7 +135,6 @@ class Notifier(object): def __init__(self, hs): self.user_to_user_stream = {} self.room_to_user_streams = {} - self.appservice_to_user_streams = {} self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() @@ -168,8 +160,6 @@ class Notifier(object): all_user_streams |= x for x in self.user_to_user_stream.values(): all_user_streams.add(x) - for x in self.appservice_to_user_streams.values(): - all_user_streams |= x return sum(stream.count_listeners() for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) @@ -182,10 +172,6 @@ class Notifier(object): "users", lambda: len(self.user_to_user_stream), ) - metrics.register_callback( - "appservices", - lambda: count(bool, self.appservice_to_user_streams.values()), - ) def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): @@ -230,20 +216,6 @@ class Notifier(object): # poke any interested application service. self.appservice_handler.notify_interested_services(event) - app_streams = set() - - for appservice in self.appservice_to_user_streams: - # TODO (kegan): Redundant appservice listener checks? - # App services will already be in the room_to_user_streams set, but - # that isn't enough. They need to be checked here in order to - # receive *invites* for users they are interested in. Does this - # make the room_to_user_streams check somewhat obselete? - if appservice.is_interested(event): - app_user_streams = self.appservice_to_user_streams.get( - appservice, set() - ) - app_streams |= app_user_streams - if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) @@ -251,11 +223,9 @@ class Notifier(object): "room_key", room_stream_id, users=extra_users, rooms=[event.room_id], - extra_streams=app_streams, ) - def on_new_event(self, stream_key, new_token, users=[], rooms=[], - extra_streams=set()): + def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. @@ -294,7 +264,6 @@ class Notifier(object): """ user_stream = self.user_to_user_stream.get(user_id) if user_stream is None: - appservice = yield self.store.get_app_service_by_user_id(user_id) current_token = yield self.event_sources.get_current_token() if room_ids is None: rooms = yield self.store.get_rooms_for_user(user_id) @@ -302,7 +271,6 @@ class Notifier(object): user_stream = _NotifierUserStream( user_id=user_id, rooms=room_ids, - appservice=appservice, current_token=current_token, time_now_ms=self.clock.time_msec(), ) @@ -477,11 +445,6 @@ class Notifier(object): s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - if user_stream.appservice: - self.appservice_to_user_stream.setdefault( - user_stream.appservice, set() - ).add(user_stream) - def _user_joined_room(self, user_id, room_id): new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: -- cgit 1.4.1 From 9da84a9a1ef0b88d2e6170d706425ab36431abda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Aug 2016 11:54:41 +0100 Subject: Make AppserviceHandler stream events from database This is for two reasons: 1. Suppresses duplicates correctly, as the notifier doesn't do any duplicate suppression. 2. Makes it easier to connect the AppserviceHandler to the replication stream. --- synapse/handlers/appservice.py | 65 +++++++++++++++------- synapse/notifier.py | 2 +- synapse/storage/appservice.py | 39 +++++++++++++ .../storage/schema/delta/34/appservice_stream.sql | 23 ++++++++ tests/handlers/test_appservice.py | 9 ++- 5 files changed, 113 insertions(+), 25 deletions(-) create mode 100644 synapse/storage/schema/delta/34/appservice_stream.sql (limited to 'synapse/notifier.py') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 79805cdc2e..84341b0d20 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure +from synapse.util.logcontext import preserve_fn import logging @@ -45,35 +46,57 @@ class ApplicationServicesHandler(object): self.clock = hs.get_clock() @defer.inlineCallbacks - def notify_interested_services(self, event): + def notify_interested_services(self, current_id): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. Args: - event(Event): The event to push out to interested services. + current_id(int): The current maximum ID. """ + services = yield self.store.get_app_services() + if not services: + return + with Measure(self.clock, "notify_interested_services"): - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - return # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as these - # user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - self.scheduler.submit_event_for_as(service, event) + upper_bound = current_id + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user query API for + # all services which match that user regex. This needs to block as + # these user queries need to be made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)(service, event) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/notifier.py b/synapse/notifier.py index e4a25f2411..40a148994f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -214,7 +214,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - self.appservice_handler.notify_interested_services(event) + self.appservice_handler.notify_interested_services(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d1ee533fac..f0c88e05cd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return 0 else: return int(last_txn_id[0]) # select 'last_txn' col + + def set_appservice_last_pos(self, pos): + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) + ) + return self.runInteraction( + "set_appservice_last_pos", set_appservice_last_pos_txn + ) + + @defer.inlineCallbacks + def get_new_events_for_appservice(self, current_id, limit): + """Get all new evnets""" + + def get_new_events_for_appservice_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e, appservice_stream_position AS a" + " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_new_events_for_appservice", get_new_events_for_appservice_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql new file mode 100644 index 0000000000..69e16eda0f --- /dev/null +++ b/synapse/storage/schema/delta/34/appservice_stream.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS appservice_stream_position( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_ordering BIGINT, + CHECK (Lock='X') +); + +INSERT INTO appservice_stream_position (stream_ordering) + SELECT COALESCE(MAX(stream_ordering), 0) FROM events; diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 9c1e5cc67c..7fe88172c0 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -53,8 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): type="m.room.message", room_id="!foo:bar" ) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) self.mock_as_api.push = Mock() - yield self.handler.notify_interested_services(event) + yield self.handler.notify_interested_services(0) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -74,7 +75,8 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() self.mock_as_api.query_user = Mock() - yield self.handler.notify_interested_services(event) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) + yield self.handler.notify_interested_services(0) self.mock_as_api.query_user.assert_called_once_with( services[0], user_id ) @@ -96,7 +98,8 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() self.mock_as_api.query_user = Mock() - yield self.handler.notify_interested_services(event) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) + yield self.handler.notify_interested_services(0) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been." -- cgit 1.4.1