summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--docs/admin_api/purge_history_api.rst11
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/room.py9
-rw-r--r--synapse/handlers/sync.py10
-rw-r--r--synapse/replication/slave/storage/directory.py8
-rw-r--r--synapse/rest/client/v1/admin.py58
-rw-r--r--synapse/storage/appservice.py82
-rw-r--r--synapse/storage/directory.py50
-rw-r--r--synapse/storage/stream.py103
9 files changed, 140 insertions, 205 deletions
diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst
index a3a17e9f9f..acf1bc5749 100644
--- a/docs/admin_api/purge_history_api.rst
+++ b/docs/admin_api/purge_history_api.rst
@@ -8,9 +8,9 @@ Depending on the amount of history being purged a call to the API may take
 several minutes or longer. During this period users will not be able to
 paginate further back in the room from the point being purged from.
 
-The API is simply:
+The API is:
 
-``POST /_matrix/client/r0/admin/purge_history/<room_id>/<event_id>``
+``POST /_matrix/client/r0/admin/purge_history/<room_id>[/<event_id>]``
 
 including an ``access_token`` of a server admin.
 
@@ -25,3 +25,10 @@ To delete local events as well, set ``delete_local_events`` in the body:
    {
        "delete_local_events": true
    }
+
+The caller must specify the point in the room to purge up to. This can be
+specified by including an event_id in the URI, or by setting a
+``purge_up_to_event_id`` or ``purge_up_to_ts`` in the request body. If an event
+id is given, that event (and others at the same graph depth) will be retained.
+If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
+in milliseconds.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7d28c2745c..dd00d8a86c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -52,16 +52,12 @@ class MessageHandler(BaseHandler):
         self.pagination_lock = ReadWriteLock()
 
     @defer.inlineCallbacks
-    def purge_history(self, room_id, event_id, delete_local_events=False):
-        event = yield self.store.get_event(event_id)
-
-        if event.room_id != room_id:
-            raise SynapseError(400, "Event is for wrong room.")
-
-        depth = event.depth
-
+    def purge_history(self, room_id, topological_ordering,
+                      delete_local_events=False):
         with (yield self.pagination_lock.write(room_id)):
-            yield self.store.purge_history(room_id, depth, delete_local_events)
+            yield self.store.purge_history(
+                room_id, topological_ordering, delete_local_events,
+            )
 
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6c425828c1..8df8fcbbad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -475,12 +475,9 @@ class RoomEventSource(object):
             user.to_string()
         )
         if app_service:
-            events, end_key = yield self.store.get_appservice_room_stream(
-                service=app_service,
-                from_key=from_key,
-                to_key=to_key,
-                limit=limit,
-            )
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             room_events = yield self.store.get_membership_changes_for_user(
                 user.to_string(), from_key, to_key
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b12988f3c9..56b86356f2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -998,8 +998,9 @@ class SyncHandler(object):
 
         app_service = self.store.get_app_service_by_user_id(user_id)
         if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             joined_room_ids = yield self.store.get_rooms_for_user(user_id)
 
@@ -1030,8 +1031,9 @@ class SyncHandler(object):
 
         app_service = self.store.get_app_service_by_user_id(user_id)
         if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             joined_room_ids = yield self.store.get_rooms_for_user(user_id)
 
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 7301d885f2..6deecd3963 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -14,10 +14,8 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage.directory import DirectoryStore
+from synapse.storage.directory import DirectoryWorkerStore
 
 
-class DirectoryStore(BaseSlavedStore):
-    get_aliases_for_room = DirectoryStore.__dict__[
-        "get_aliases_for_room"
-    ]
+class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 3917eee42d..dcf6215dad 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.api.errors import AuthError, SynapseError
+from synapse.api.errors import AuthError, SynapseError, Codes
 from synapse.types import UserID, create_requester
 from synapse.http.servlet import parse_json_object_from_request
 
@@ -114,12 +114,18 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet):
 
 class PurgeHistoryRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns(
-        "/admin/purge_history/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
+        "/admin/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
     )
 
     def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer)
+        """
         super(PurgeHistoryRestServlet, self).__init__(hs)
         self.handlers = hs.get_handlers()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_POST(self, request, room_id, event_id):
@@ -133,8 +139,54 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
 
         delete_local_events = bool(body.get("delete_local_events", False))
 
+        # establish the topological ordering we should keep events from. The
+        # user can provide an event_id in the URL or the request body, or can
+        # provide a timestamp in the request body.
+        if event_id is None:
+            event_id = body.get('purge_up_to_event_id')
+
+        if event_id is not None:
+            event = yield self.store.get_event(event_id)
+
+            if event.room_id != room_id:
+                raise SynapseError(400, "Event is for wrong room.")
+
+            depth = event.depth
+            logger.info(
+                "[purge] purging up to depth %i (event_id %s)",
+                depth, event_id,
+            )
+        elif 'purge_up_to_ts' in body:
+            ts = body['purge_up_to_ts']
+            if not isinstance(ts, int):
+                raise SynapseError(
+                    400, "purge_up_to_ts must be an int",
+                    errcode=Codes.BAD_JSON,
+                )
+
+            stream_ordering = (
+                yield self.store.find_first_stream_ordering_after_ts(ts)
+            )
+
+            (_, depth, _) = (
+                yield self.store.get_room_event_after_stream_ordering(
+                    room_id, stream_ordering,
+                )
+            )
+            logger.info(
+                "[purge] purging up to depth %i (received_ts %i => "
+                "stream_ordering %i)",
+                depth, ts, stream_ordering,
+            )
+        else:
+            raise SynapseError(
+                400,
+                "must specify purge_up_to_event_id or purge_up_to_ts",
+                errcode=Codes.BAD_JSON,
+            )
+
         yield self.handlers.message_handler.purge_history(
-            room_id, event_id,
+            room_id, depth,
             delete_local_events=delete_local_events,
         )
 
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 90fb51d43c..12ea8a158c 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -18,11 +18,9 @@ import re
 import simplejson as json
 from twisted.internet import defer
 
-from synapse.api.constants import Membership
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
 from synapse.storage.events import EventsWorkerStore
-from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
 
@@ -115,81 +113,11 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
 
 
 class ApplicationServiceStore(ApplicationServiceWorkerStore):
-
-    def __init__(self, db_conn, hs):
-        super(ApplicationServiceStore, self).__init__(db_conn, hs)
-        self.hostname = hs.hostname
-
-    def get_app_service_rooms(self, service):
-        """Get a list of RoomsForUser for this application service.
-
-        Application services may be "interested" in lots of rooms depending on
-        the room ID, the room aliases, or the members in the room. This function
-        takes all of these into account and returns a list of RoomsForUser which
-        represent the entire list of room IDs that this application service
-        wants to know about.
-
-        Args:
-            service: The application service to get a room list for.
-        Returns:
-            A list of RoomsForUser.
-        """
-        return self.runInteraction(
-            "get_app_service_rooms",
-            self._get_app_service_rooms_txn,
-            service,
-        )
-
-    def _get_app_service_rooms_txn(self, txn, service):
-        # get all rooms matching the room ID regex.
-        room_entries = self._simple_select_list_txn(
-            txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
-        )
-        matching_room_list = set([
-            r["room_id"] for r in room_entries if
-            service.is_interested_in_room(r["room_id"])
-        ])
-
-        # resolve room IDs for matching room alias regex.
-        room_alias_mappings = self._simple_select_list_txn(
-            txn=txn, table="room_aliases", keyvalues=None,
-            retcols=["room_id", "room_alias"]
-        )
-        matching_room_list |= set([
-            r["room_id"] for r in room_alias_mappings if
-            service.is_interested_in_alias(r["room_alias"])
-        ])
-
-        # get all rooms for every user for this AS. This is scoped to users on
-        # this HS only.
-        user_list = self._simple_select_list_txn(
-            txn=txn, table="users", keyvalues=None, retcols=["name"]
-        )
-        user_list = [
-            u["name"] for u in user_list if
-            service.is_interested_in_user(u["name"])
-        ]
-        rooms_for_user_matching_user_id = set()  # RoomsForUser list
-        for user_id in user_list:
-            # FIXME: This assumes this store is linked with RoomMemberStore :(
-            rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
-                txn=txn,
-                user_id=user_id,
-                membership_list=[Membership.JOIN]
-            )
-            rooms_for_user_matching_user_id |= set(rooms_for_user)
-
-        # make RoomsForUser tuples for room ids and aliases which are not in the
-        # main rooms_for_user_list - e.g. they are rooms which do not have AS
-        # registered users in it.
-        known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
-        missing_rooms_for_user = [
-            RoomsForUser(r, service.sender, "join") for r in
-            matching_room_list if r not in known_room_ids
-        ]
-        rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
-
-        return rooms_for_user_matching_user_id
+    # This is currently empty due to there not being any AS storage functions
+    # that can't be run on the workers. Since this may change in future, and
+    # to keep consistency with the other stores, we keep this empty class for
+    # now.
+    pass
 
 
 class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 79e7c540ad..d0c0059757 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -29,8 +29,7 @@ RoomAliasMapping = namedtuple(
 )
 
 
-class DirectoryStore(SQLBaseStore):
-
+class DirectoryWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_association_from_room_alias(self, room_alias):
         """ Get's the room_id and server list for a given room_alias
@@ -69,6 +68,28 @@ class DirectoryStore(SQLBaseStore):
             RoomAliasMapping(room_id, room_alias.to_string(), servers)
         )
 
+    def get_room_alias_creator(self, room_alias):
+        return self._simple_select_one_onecol(
+            table="room_aliases",
+            keyvalues={
+                "room_alias": room_alias,
+            },
+            retcol="creator",
+            desc="get_room_alias_creator",
+            allow_none=True
+        )
+
+    @cached(max_entries=5000)
+    def get_aliases_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_aliases",
+            {"room_id": room_id},
+            "room_alias",
+            desc="get_aliases_for_room",
+        )
+
+
+class DirectoryStore(DirectoryWorkerStore):
     @defer.inlineCallbacks
     def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
         """ Creates an associatin between  a room alias and room_id/servers
@@ -116,17 +137,6 @@ class DirectoryStore(SQLBaseStore):
             )
         defer.returnValue(ret)
 
-    def get_room_alias_creator(self, room_alias):
-        return self._simple_select_one_onecol(
-            table="room_aliases",
-            keyvalues={
-                "room_alias": room_alias,
-            },
-            retcol="creator",
-            desc="get_room_alias_creator",
-            allow_none=True
-        )
-
     @defer.inlineCallbacks
     def delete_room_alias(self, room_alias):
         room_id = yield self.runInteraction(
@@ -135,7 +145,6 @@ class DirectoryStore(SQLBaseStore):
             room_alias,
         )
 
-        self.get_aliases_for_room.invalidate((room_id,))
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
@@ -160,17 +169,12 @@ class DirectoryStore(SQLBaseStore):
             (room_alias.to_string(),)
         )
 
-        return room_id
-
-    @cached(max_entries=5000)
-    def get_aliases_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_aliases",
-            {"room_id": room_id},
-            "room_alias",
-            desc="get_aliases_for_room",
+        self._invalidate_cache_and_stream(
+            txn, self.get_aliases_for_room, (room_id,)
         )
 
+        return room_id
+
     def update_aliases_for_room(self, old_room_id, new_room_id, creator):
         def _update_aliases_for_room_txn(txn):
             sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a2527d2a36..2956c3b3e0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,7 +39,6 @@ from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 
 from synapse.util.caches.descriptors import cached
-from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
@@ -416,6 +415,33 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
+    def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
+        """Gets details of the first event in a room at or after a stream ordering
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+
+        Returns:
+            Deferred[(int, int, str)]:
+                (stream ordering, topological ordering, event_id)
+        """
+        def _f(txn):
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering >= ?"
+                " AND NOT outlier"
+                " ORDER BY stream_ordering"
+                " LIMIT 1"
+            )
+            txn.execute(sql, (room_id, stream_ordering, ))
+            return txn.fetchone()
+
+        return self.runInteraction(
+            "get_room_event_after_stream_ordering", _f,
+        )
+
     @defer.inlineCallbacks
     def get_room_events_max_id(self, room_id=None):
         """Returns the current token for rooms stream.
@@ -718,81 +744,6 @@ class StreamStore(StreamWorkerStore):
         return self._backfill_id_gen.get_current_token()
 
     @defer.inlineCallbacks
-    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
-        # NB this lives here instead of appservice.py so we can reuse the
-        # 'private' StreamToken class in this file.
-        if limit:
-            limit = max(limit, MAX_STREAM_SIZE)
-        else:
-            limit = MAX_STREAM_SIZE
-
-        # From and to keys should be integers from ordering.
-        from_id = RoomStreamToken.parse_stream_token(from_key)
-        to_id = RoomStreamToken.parse_stream_token(to_key)
-
-        if from_key == to_key:
-            defer.returnValue(([], to_key))
-            return
-
-        # select all the events between from/to with a sensible limit
-        sql = (
-            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
-            "e.stream_ordering FROM events AS e "
-            "LEFT JOIN state_events as s ON "
-            "e.event_id = s.event_id "
-            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
-            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
-        ) % {
-            "limit": limit
-        }
-
-        def f(txn):
-            # pull out all the events between the tokens
-            txn.execute(sql, (from_id.stream, to_id.stream,))
-            rows = self.cursor_to_dict(txn)
-
-            # Logic:
-            #  - We want ALL events which match the AS room_id regex
-            #  - We want ALL events which match the rooms represented by the AS
-            #    room_alias regex
-            #  - We want ALL events for rooms that AS users have joined.
-            # This is currently supported via get_app_service_rooms (which is
-            # used for the Notifier listener rooms). We can't reasonably make a
-            # SQL query for these room IDs, so we'll pull all the events between
-            # from/to and filter in python.
-            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
-            room_ids_for_as = [r.room_id for r in rooms_for_as]
-
-            def app_service_interested(row):
-                if row["room_id"] in room_ids_for_as:
-                    return True
-
-                if row["type"] == EventTypes.Member:
-                    if service.is_interested_in_user(row.get("state_key")):
-                        return True
-                return False
-
-            return [r for r in rows if app_service_interested(r)]
-
-        rows = yield self.runInteraction("get_appservice_room_stream", f)
-
-        ret = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
-        )
-
-        self._set_before_and_after(ret, rows, topo_order=from_id is None)
-
-        if rows:
-            key = "s%d" % max(r["stream_ordering"] for r in rows)
-        else:
-            # Assume we didn't get anything because there was nothing to
-            # get.
-            key = to_key
-
-        defer.returnValue((ret, key))
-
-    @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
                              direction='b', limit=-1, event_filter=None):
         # Tokens really represent positions between elements, but we use