summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/scheduler.py14
-rw-r--r--synapse/handlers/__init__.py14
-rw-r--r--synapse/handlers/appservice.py15
-rw-r--r--synapse/handlers/auth.py9
-rw-r--r--synapse/handlers/directory.py3
-rw-r--r--synapse/notifier.py10
-rw-r--r--synapse/rest/client/v1/room.py2
-rw-r--r--synapse/server.py20
-rw-r--r--synapse/storage/__init__.py16
-rw-r--r--synapse/storage/_base.py1
-rw-r--r--synapse/storage/event_push_actions.py89
-rw-r--r--synapse/storage/receipts.py2
-rw-r--r--tests/handlers/test_appservice.py6
13 files changed, 150 insertions, 51 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 47a4e9f864..9afc8fd754 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -56,22 +56,22 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class AppServiceScheduler(object):
+class ApplicationServiceScheduler(object):
     """ Public facing API for this module. Does the required DI to tie the
     components together. This also serves as the "event_pool", which in this
     case is a simple array.
     """
 
-    def __init__(self, clock, store, as_api):
-        self.clock = clock
-        self.store = store
-        self.as_api = as_api
+    def __init__(self, hs):
+        self.clock = hs.get_clock()
+        self.store = hs.get_datastore()
+        self.as_api = hs.get_application_service_api()
 
         def create_recoverer(service, callback):
-            return _Recoverer(clock, store, as_api, service, callback)
+            return _Recoverer(self.clock, self.store, self.as_api, service, callback)
 
         self.txn_ctrl = _TransactionController(
-            clock, store, as_api, create_recoverer
+            self.clock, self.store, self.as_api, create_recoverer
         )
         self.queuer = _ServiceQueuer(self.txn_ctrl)
 
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 9442ae6f1d..c0069e23d6 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,11 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.appservice.scheduler import AppServiceScheduler
-from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
-    RoomCreationHandler, RoomListHandler, RoomContextHandler,
+    RoomCreationHandler, RoomContextHandler,
 )
 from .room_member import RoomMemberHandler
 from .message import MessageHandler
@@ -26,7 +24,6 @@ from .federation import FederationHandler
 from .profile import ProfileHandler
 from .directory import DirectoryHandler
 from .admin import AdminHandler
-from .appservice import ApplicationServicesHandler
 from .auth import AuthHandler
 from .identity import IdentityHandler
 from .receipts import ReceiptsHandler
@@ -50,18 +47,9 @@ class Handlers(object):
         self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
-        self.room_list_handler = RoomListHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.receipts_handler = ReceiptsHandler(hs)
-        asapi = ApplicationServiceApi(hs)
-        self.appservice_handler = ApplicationServicesHandler(
-            hs, asapi, AppServiceScheduler(
-                clock=hs.get_clock(),
-                store=hs.get_datastore(),
-                as_api=asapi
-            )
-        )
         self.auth_handler = AuthHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..051ccdb380 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.appservice import ApplicationService
-from synapse.types import UserID
 
 import logging
 
@@ -35,16 +34,13 @@ def log_failure(failure):
     )
 
 
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
 class ApplicationServicesHandler(object):
 
-    def __init__(self, hs, appservice_api, appservice_scheduler):
+    def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.hs = hs
-        self.appservice_api = appservice_api
-        self.scheduler = appservice_scheduler
+        self.is_mine_id = hs.is_mine_id
+        self.appservice_api = hs.get_application_service_api()
+        self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
 
     @defer.inlineCallbacks
@@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
 
     @defer.inlineCallbacks
     def _is_unknown_user(self, user_id):
-        user = UserID.from_string(user_id)
-        if not self.hs.is_mine(user):
+        if not self.is_mine_id(user_id):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
             defer.returnValue(False)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 68d0d78fc6..26c865e171 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
 from synapse.types import UserID
-from synapse.api.errors import AuthError, LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
 from synapse.util.async import run_on_reactor
 
 from twisted.web.client import PartialDownloadError
@@ -563,7 +563,12 @@ class AuthHandler(BaseHandler):
 
         except_access_token_ids = [requester.access_token_id] if requester else []
 
-        yield self.store.user_set_password_hash(user_id, password_hash)
+        try:
+            yield self.store.user_set_password_hash(user_id, password_hash)
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+            raise e
         yield self.store.user_delete_access_tokens(
             user_id, except_access_token_ids
         )
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8eeb225811..4bea7f2b19 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
         super(DirectoryHandler, self).__init__(hs)
 
         self.state = hs.get_state_handler()
+        self.appservice_handler = hs.get_application_service_handler()
 
         self.federation = hs.get_replication_layer()
         self.federation.register_query_handler(
@@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
         )
         if not result:
             # Query AS to see if it exists
-            as_handler = self.hs.get_handlers().appservice_handler
+            as_handler = self.appservice_handler
             result = yield as_handler.query_room_alias_exists(room_alias)
         defer.returnValue(result)
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 33b79c0ec7..cbec4d30ae 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -140,8 +140,6 @@ class Notifier(object):
     UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
 
     def __init__(self, hs):
-        self.hs = hs
-
         self.user_to_user_stream = {}
         self.room_to_user_streams = {}
         self.appservice_to_user_streams = {}
@@ -151,6 +149,8 @@ class Notifier(object):
         self.pending_new_room_events = []
 
         self.clock = hs.get_clock()
+        self.appservice_handler = hs.get_application_service_handler()
+        self.state_handler = hs.get_state_handler()
 
         hs.get_distributor().observe(
             "user_joined_room", self._user_joined_room
@@ -232,9 +232,7 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        self.hs.get_handlers().appservice_handler.notify_interested_services(
-            event
-        )
+        self.appservice_handler.notify_interested_services(event)
 
         app_streams = set()
 
@@ -449,7 +447,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def _is_world_readable(self, room_id):
-        state = yield self.hs.get_state_handler().get_current_state(
+        state = yield self.state_handler.get_current_state(
             room_id,
             EventTypes.RoomHistoryVisibility
         )
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 644aa4e513..2d22bbdaa3 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -279,7 +279,7 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
-        handler = self.handlers.room_list_handler
+        handler = self.hs.get_room_list_handler()
         data = yield handler.get_public_room_list()
         defer.returnValue((200, data))
 
diff --git a/synapse/server.py b/synapse/server.py
index 01f828819f..7cf22b1eea 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -22,6 +22,8 @@
 from twisted.web.client import BrowserLikePolicyForHTTPS
 from twisted.enterprise import adbapi
 
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.appservice.api import ApplicationServiceApi
 from synapse.federation import initialize_http_replication
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.notifier import Notifier
@@ -30,6 +32,8 @@ from synapse.handlers import Handlers
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
+from synapse.handlers.room import RoomListHandler
+from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.state import StateHandler
 from synapse.storage import DataStore
 from synapse.util import Clock
@@ -84,6 +88,10 @@ class HomeServer(object):
         'presence_handler',
         'sync_handler',
         'typing_handler',
+        'room_list_handler',
+        'application_service_api',
+        'application_service_scheduler',
+        'application_service_handler',
         'notifier',
         'distributor',
         'client_resource',
@@ -179,6 +187,18 @@ class HomeServer(object):
     def build_sync_handler(self):
         return SyncHandler(self)
 
+    def build_room_list_handler(self):
+        return RoomListHandler(self)
+
+    def build_application_service_api(self):
+        return ApplicationServiceApi(self)
+
+    def build_application_service_scheduler(self):
+        return ApplicationServiceScheduler(self)
+
+    def build_application_service_handler(self):
+        return ApplicationServicesHandler(self)
+
     def build_event_sources(self):
         return EventSources(self)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d970fde9e8..8581796b7e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
-from ._base import Cache
+from ._base import Cache, LoggingTransaction
 from .directory import DirectoryStore
 from .events import EventsStore
 from .presence import PresenceStore, UserPresenceState
@@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
     def __init__(self, db_conn, hs):
         self.hs = hs
+        self._clock = hs.get_clock()
         self.database_engine = hs.database_engine
 
         self.client_ip_last_seen = Cache(
@@ -173,6 +174,19 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=push_rules_prefill,
         )
 
+        cur = LoggingTransaction(
+            db_conn.cursor(),
+            name="_find_stream_orderings_for_times_txn",
+            database_engine=self.database_engine,
+            after_callbacks=[]
+        )
+        self._find_stream_orderings_for_times_txn(cur)
+        cur.close()
+
+        self.find_stream_orderings_looping_call = self._clock.looping_call(
+            self._find_stream_orderings_for_times, 60 * 60 * 1000
+        )
+
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e0d7098692..56a0dd80f3 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -153,7 +153,6 @@ class SQLBaseStore(object):
     def __init__(self, hs):
         self.hs = hs
         self._db_pool = hs.get_db_pool()
-        self._clock = hs.get_clock()
 
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9705db5c47..4dae51a172 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
 
 
 class EventPushActionsStore(SQLBaseStore):
+    def __init__(self, hs):
+        self.stream_ordering_month_ago = None
+        super(EventPushActionsStore, self).__init__(hs)
+
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
         Args:
@@ -224,18 +228,93 @@ class EventPushActionsStore(SQLBaseStore):
             (room_id, event_id)
         )
 
-    def _remove_push_actions_before_txn(self, txn, room_id, user_id,
-                                        topological_ordering):
+    def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+                                            topological_ordering):
+        """
+        Purges old, stale push actions for a user and room before a given
+        topological_ordering
+        Args:
+            txn: The transcation
+            room_id: Room ID to delete from
+            user_id: user ID to delete for
+            topological_ordering: The lowest topological ordering which will
+                                  not be deleted.
+        """
         txn.call_after(
             self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
             (room_id, user_id, )
         )
+
+        # We need to join on the events table to get the received_ts for
+        # event_push_actions and sqlite won't let us use a join in a delete so
+        # we can't just delete where received_ts < x. Furthermore we can
+        # only identify event_push_actions by a tuple of room_id, event_id
+        # we we can't use a subquery.
+        # Instead, we look up the stream ordering for the last event in that
+        # room received before the threshold time and delete event_push_actions
+        # in the room with a stream_odering before that.
         txn.execute(
-            "DELETE FROM event_push_actions"
-            " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
-            (room_id, user_id, topological_ordering,)
+            "DELETE FROM event_push_actions "
+            " WHERE user_id = ? AND room_id = ? AND "
+            " topological_ordering < ? AND stream_ordering < ?",
+            (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+        )
+
+    @defer.inlineCallbacks
+    def _find_stream_orderings_for_times(self):
+        yield self.runInteraction(
+            "_find_stream_orderings_for_times",
+            self._find_stream_orderings_for_times_txn
+        )
+
+    def _find_stream_orderings_for_times_txn(self, txn):
+        logger.info("Searching for stream ordering 1 month ago")
+        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 month ago: it's %d",
+            self.stream_ordering_month_ago
         )
 
+    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+        """
+        Find the stream_ordering of the first event that was received after
+        a given timestamp. This is relatively slow as there is no index on
+        received_ts but we can then use this to delete push actions before
+        this.
+
+        received_ts must necessarily be in the same order as stream_ordering
+        and stream_ordering is indexed, so we manually binary search using
+        stream_ordering
+        """
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        max_stream_ordering = txn.fetchone()[0]
+
+        if max_stream_ordering is None:
+            return 0
+
+        range_start = 0
+        range_end = max_stream_ordering
+
+        sql = (
+            "SELECT received_ts FROM events"
+            " WHERE stream_ordering > ?"
+            " ORDER BY stream_ordering"
+            " LIMIT 1"
+        )
+
+        while range_end - range_start > 1:
+            middle = int((range_end + range_start) / 2)
+            txn.execute(sql, (middle,))
+            middle_ts = txn.fetchone()[0]
+            if ts > middle_ts:
+                range_start = middle
+            else:
+                range_end = middle
+
+        return range_end
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..f1774f0e44 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
         )
 
         if receipt_type == "m.read" and topological_ordering:
-            self._remove_push_actions_before_txn(
+            self._remove_old_push_actions_before_txn(
                 txn,
                 room_id=room_id,
                 user_id=user_id,
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 7ddbbb9b4a..a884c95f8d 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -30,9 +30,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         self.mock_scheduler = Mock()
         hs = Mock()
         hs.get_datastore = Mock(return_value=self.mock_store)
-        self.handler = ApplicationServicesHandler(
-            hs, self.mock_as_api, self.mock_scheduler
-        )
+        hs.get_application_service_api = Mock(return_value=self.mock_as_api)
+        hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
+        self.handler = ApplicationServicesHandler(hs)
 
     @defer.inlineCallbacks
     def test_notify_interested_services(self):