summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/room.py9
-rw-r--r--synapse/handlers/sync.py10
-rw-r--r--synapse/replication/slave/storage/directory.py8
-rw-r--r--synapse/rest/client/v1/admin.py58
-rw-r--r--synapse/storage/appservice.py82
-rw-r--r--synapse/storage/directory.py50
-rw-r--r--synapse/storage/stream.py103
8 files changed, 131 insertions, 203 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7d28c2745c..dd00d8a86c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -52,16 +52,12 @@ class MessageHandler(BaseHandler):
         self.pagination_lock = ReadWriteLock()
 
     @defer.inlineCallbacks
-    def purge_history(self, room_id, event_id, delete_local_events=False):
-        event = yield self.store.get_event(event_id)
-
-        if event.room_id != room_id:
-            raise SynapseError(400, "Event is for wrong room.")
-
-        depth = event.depth
-
+    def purge_history(self, room_id, topological_ordering,
+                      delete_local_events=False):
         with (yield self.pagination_lock.write(room_id)):
-            yield self.store.purge_history(room_id, depth, delete_local_events)
+            yield self.store.purge_history(
+                room_id, topological_ordering, delete_local_events,
+            )
 
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6c425828c1..8df8fcbbad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -475,12 +475,9 @@ class RoomEventSource(object):
             user.to_string()
         )
         if app_service:
-            events, end_key = yield self.store.get_appservice_room_stream(
-                service=app_service,
-                from_key=from_key,
-                to_key=to_key,
-                limit=limit,
-            )
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             room_events = yield self.store.get_membership_changes_for_user(
                 user.to_string(), from_key, to_key
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b12988f3c9..56b86356f2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -998,8 +998,9 @@ class SyncHandler(object):
 
         app_service = self.store.get_app_service_by_user_id(user_id)
         if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             joined_room_ids = yield self.store.get_rooms_for_user(user_id)
 
@@ -1030,8 +1031,9 @@ class SyncHandler(object):
 
         app_service = self.store.get_app_service_by_user_id(user_id)
         if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             joined_room_ids = yield self.store.get_rooms_for_user(user_id)
 
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 7301d885f2..6deecd3963 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -14,10 +14,8 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage.directory import DirectoryStore
+from synapse.storage.directory import DirectoryWorkerStore
 
 
-class DirectoryStore(BaseSlavedStore):
-    get_aliases_for_room = DirectoryStore.__dict__[
-        "get_aliases_for_room"
-    ]
+class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 3917eee42d..dcf6215dad 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.api.errors import AuthError, SynapseError
+from synapse.api.errors import AuthError, SynapseError, Codes
 from synapse.types import UserID, create_requester
 from synapse.http.servlet import parse_json_object_from_request
 
@@ -114,12 +114,18 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet):
 
 class PurgeHistoryRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns(
-        "/admin/purge_history/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
+        "/admin/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
     )
 
     def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer)
+        """
         super(PurgeHistoryRestServlet, self).__init__(hs)
         self.handlers = hs.get_handlers()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_POST(self, request, room_id, event_id):
@@ -133,8 +139,54 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
 
         delete_local_events = bool(body.get("delete_local_events", False))
 
+        # establish the topological ordering we should keep events from. The
+        # user can provide an event_id in the URL or the request body, or can
+        # provide a timestamp in the request body.
+        if event_id is None:
+            event_id = body.get('purge_up_to_event_id')
+
+        if event_id is not None:
+            event = yield self.store.get_event(event_id)
+
+            if event.room_id != room_id:
+                raise SynapseError(400, "Event is for wrong room.")
+
+            depth = event.depth
+            logger.info(
+                "[purge] purging up to depth %i (event_id %s)",
+                depth, event_id,
+            )
+        elif 'purge_up_to_ts' in body:
+            ts = body['purge_up_to_ts']
+            if not isinstance(ts, int):
+                raise SynapseError(
+                    400, "purge_up_to_ts must be an int",
+                    errcode=Codes.BAD_JSON,
+                )
+
+            stream_ordering = (
+                yield self.store.find_first_stream_ordering_after_ts(ts)
+            )
+
+            (_, depth, _) = (
+                yield self.store.get_room_event_after_stream_ordering(
+                    room_id, stream_ordering,
+                )
+            )
+            logger.info(
+                "[purge] purging up to depth %i (received_ts %i => "
+                "stream_ordering %i)",
+                depth, ts, stream_ordering,
+            )
+        else:
+            raise SynapseError(
+                400,
+                "must specify purge_up_to_event_id or purge_up_to_ts",
+                errcode=Codes.BAD_JSON,
+            )
+
         yield self.handlers.message_handler.purge_history(
-            room_id, event_id,
+            room_id, depth,
             delete_local_events=delete_local_events,
         )
 
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 90fb51d43c..12ea8a158c 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -18,11 +18,9 @@ import re
 import simplejson as json
 from twisted.internet import defer
 
-from synapse.api.constants import Membership
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
 from synapse.storage.events import EventsWorkerStore
-from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
 
@@ -115,81 +113,11 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
 
 
 class ApplicationServiceStore(ApplicationServiceWorkerStore):
-
-    def __init__(self, db_conn, hs):
-        super(ApplicationServiceStore, self).__init__(db_conn, hs)
-        self.hostname = hs.hostname
-
-    def get_app_service_rooms(self, service):
-        """Get a list of RoomsForUser for this application service.
-
-        Application services may be "interested" in lots of rooms depending on
-        the room ID, the room aliases, or the members in the room. This function
-        takes all of these into account and returns a list of RoomsForUser which
-        represent the entire list of room IDs that this application service
-        wants to know about.
-
-        Args:
-            service: The application service to get a room list for.
-        Returns:
-            A list of RoomsForUser.
-        """
-        return self.runInteraction(
-            "get_app_service_rooms",
-            self._get_app_service_rooms_txn,
-            service,
-        )
-
-    def _get_app_service_rooms_txn(self, txn, service):
-        # get all rooms matching the room ID regex.
-        room_entries = self._simple_select_list_txn(
-            txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
-        )
-        matching_room_list = set([
-            r["room_id"] for r in room_entries if
-            service.is_interested_in_room(r["room_id"])
-        ])
-
-        # resolve room IDs for matching room alias regex.
-        room_alias_mappings = self._simple_select_list_txn(
-            txn=txn, table="room_aliases", keyvalues=None,
-            retcols=["room_id", "room_alias"]
-        )
-        matching_room_list |= set([
-            r["room_id"] for r in room_alias_mappings if
-            service.is_interested_in_alias(r["room_alias"])
-        ])
-
-        # get all rooms for every user for this AS. This is scoped to users on
-        # this HS only.
-        user_list = self._simple_select_list_txn(
-            txn=txn, table="users", keyvalues=None, retcols=["name"]
-        )
-        user_list = [
-            u["name"] for u in user_list if
-            service.is_interested_in_user(u["name"])
-        ]
-        rooms_for_user_matching_user_id = set()  # RoomsForUser list
-        for user_id in user_list:
-            # FIXME: This assumes this store is linked with RoomMemberStore :(
-            rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
-                txn=txn,
-                user_id=user_id,
-                membership_list=[Membership.JOIN]
-            )
-            rooms_for_user_matching_user_id |= set(rooms_for_user)
-
-        # make RoomsForUser tuples for room ids and aliases which are not in the
-        # main rooms_for_user_list - e.g. they are rooms which do not have AS
-        # registered users in it.
-        known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
-        missing_rooms_for_user = [
-            RoomsForUser(r, service.sender, "join") for r in
-            matching_room_list if r not in known_room_ids
-        ]
-        rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
-
-        return rooms_for_user_matching_user_id
+    # This is currently empty due to there not being any AS storage functions
+    # that can't be run on the workers. Since this may change in future, and
+    # to keep consistency with the other stores, we keep this empty class for
+    # now.
+    pass
 
 
 class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 79e7c540ad..d0c0059757 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -29,8 +29,7 @@ RoomAliasMapping = namedtuple(
 )
 
 
-class DirectoryStore(SQLBaseStore):
-
+class DirectoryWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_association_from_room_alias(self, room_alias):
         """ Get's the room_id and server list for a given room_alias
@@ -69,6 +68,28 @@ class DirectoryStore(SQLBaseStore):
             RoomAliasMapping(room_id, room_alias.to_string(), servers)
         )
 
+    def get_room_alias_creator(self, room_alias):
+        return self._simple_select_one_onecol(
+            table="room_aliases",
+            keyvalues={
+                "room_alias": room_alias,
+            },
+            retcol="creator",
+            desc="get_room_alias_creator",
+            allow_none=True
+        )
+
+    @cached(max_entries=5000)
+    def get_aliases_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_aliases",
+            {"room_id": room_id},
+            "room_alias",
+            desc="get_aliases_for_room",
+        )
+
+
+class DirectoryStore(DirectoryWorkerStore):
     @defer.inlineCallbacks
     def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
         """ Creates an associatin between  a room alias and room_id/servers
@@ -116,17 +137,6 @@ class DirectoryStore(SQLBaseStore):
             )
         defer.returnValue(ret)
 
-    def get_room_alias_creator(self, room_alias):
-        return self._simple_select_one_onecol(
-            table="room_aliases",
-            keyvalues={
-                "room_alias": room_alias,
-            },
-            retcol="creator",
-            desc="get_room_alias_creator",
-            allow_none=True
-        )
-
     @defer.inlineCallbacks
     def delete_room_alias(self, room_alias):
         room_id = yield self.runInteraction(
@@ -135,7 +145,6 @@ class DirectoryStore(SQLBaseStore):
             room_alias,
         )
 
-        self.get_aliases_for_room.invalidate((room_id,))
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
@@ -160,17 +169,12 @@ class DirectoryStore(SQLBaseStore):
             (room_alias.to_string(),)
         )
 
-        return room_id
-
-    @cached(max_entries=5000)
-    def get_aliases_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_aliases",
-            {"room_id": room_id},
-            "room_alias",
-            desc="get_aliases_for_room",
+        self._invalidate_cache_and_stream(
+            txn, self.get_aliases_for_room, (room_id,)
         )
 
+        return room_id
+
     def update_aliases_for_room(self, old_room_id, new_room_id, creator):
         def _update_aliases_for_room_txn(txn):
             sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a2527d2a36..2956c3b3e0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,7 +39,6 @@ from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 
 from synapse.util.caches.descriptors import cached
-from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
@@ -416,6 +415,33 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
+    def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
+        """Gets details of the first event in a room at or after a stream ordering
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+
+        Returns:
+            Deferred[(int, int, str)]:
+                (stream ordering, topological ordering, event_id)
+        """
+        def _f(txn):
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering >= ?"
+                " AND NOT outlier"
+                " ORDER BY stream_ordering"
+                " LIMIT 1"
+            )
+            txn.execute(sql, (room_id, stream_ordering, ))
+            return txn.fetchone()
+
+        return self.runInteraction(
+            "get_room_event_after_stream_ordering", _f,
+        )
+
     @defer.inlineCallbacks
     def get_room_events_max_id(self, room_id=None):
         """Returns the current token for rooms stream.
@@ -718,81 +744,6 @@ class StreamStore(StreamWorkerStore):
         return self._backfill_id_gen.get_current_token()
 
     @defer.inlineCallbacks
-    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
-        # NB this lives here instead of appservice.py so we can reuse the
-        # 'private' StreamToken class in this file.
-        if limit:
-            limit = max(limit, MAX_STREAM_SIZE)
-        else:
-            limit = MAX_STREAM_SIZE
-
-        # From and to keys should be integers from ordering.
-        from_id = RoomStreamToken.parse_stream_token(from_key)
-        to_id = RoomStreamToken.parse_stream_token(to_key)
-
-        if from_key == to_key:
-            defer.returnValue(([], to_key))
-            return
-
-        # select all the events between from/to with a sensible limit
-        sql = (
-            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
-            "e.stream_ordering FROM events AS e "
-            "LEFT JOIN state_events as s ON "
-            "e.event_id = s.event_id "
-            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
-            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
-        ) % {
-            "limit": limit
-        }
-
-        def f(txn):
-            # pull out all the events between the tokens
-            txn.execute(sql, (from_id.stream, to_id.stream,))
-            rows = self.cursor_to_dict(txn)
-
-            # Logic:
-            #  - We want ALL events which match the AS room_id regex
-            #  - We want ALL events which match the rooms represented by the AS
-            #    room_alias regex
-            #  - We want ALL events for rooms that AS users have joined.
-            # This is currently supported via get_app_service_rooms (which is
-            # used for the Notifier listener rooms). We can't reasonably make a
-            # SQL query for these room IDs, so we'll pull all the events between
-            # from/to and filter in python.
-            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
-            room_ids_for_as = [r.room_id for r in rooms_for_as]
-
-            def app_service_interested(row):
-                if row["room_id"] in room_ids_for_as:
-                    return True
-
-                if row["type"] == EventTypes.Member:
-                    if service.is_interested_in_user(row.get("state_key")):
-                        return True
-                return False
-
-            return [r for r in rows if app_service_interested(r)]
-
-        rows = yield self.runInteraction("get_appservice_room_stream", f)
-
-        ret = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
-        )
-
-        self._set_before_and_after(ret, rows, topo_order=from_id is None)
-
-        if rows:
-            key = "s%d" % max(r["stream_ordering"] for r in rows)
-        else:
-            # Assume we didn't get anything because there was nothing to
-            # get.
-            key = to_key
-
-        defer.returnValue((ret, key))
-
-    @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
                              direction='b', limit=-1, event_filter=None):
         # Tokens really represent positions between elements, but we use