summary refs log tree commit diff
diff options
context:
space:
mode:
authorConnor Davis <mail@connordav.is>2022-09-07 05:54:44 -0400
committerGitHub <noreply@github.com>2022-09-07 10:54:44 +0100
commitbb5b47b62a11b14a3458e5a8aafd9ddaf1294199 (patch)
tree9b60345e6b66f074364b91e4251b916440809320
parentRemove the unspecced room_id field in the /hierarchy response. (#13506) (diff)
downloadsynapse-bb5b47b62a11b14a3458e5a8aafd9ddaf1294199.tar.xz
Add Admin API to Fetch Messages Within a Particular Window (#13672)
This adds two new admin APIs that allow us to fetch messages from a room within a particular time.
-rw-r--r--changelog.d/13672.feature1
-rw-r--r--docs/admin_api/rooms.md145
-rw-r--r--synapse/handlers/pagination.py37
-rw-r--r--synapse/rest/admin/__init__.py4
-rw-r--r--synapse/rest/admin/rooms.py104
-rw-r--r--tests/rest/admin/test_room.py158
6 files changed, 435 insertions, 14 deletions
diff --git a/changelog.d/13672.feature b/changelog.d/13672.feature
new file mode 100644
index 0000000000..2334e6fe15
--- /dev/null
+++ b/changelog.d/13672.feature
@@ -0,0 +1 @@
+Add admin APIs to fetch messages within a particular window of time.
diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md
index 7526956bec..8f727b363e 100644
--- a/docs/admin_api/rooms.md
+++ b/docs/admin_api/rooms.md
@@ -393,6 +393,151 @@ A response body like the following is returned:
 }
 ```
 
+# Room Messages API
+
+The Room Messages admin API allows server admins to get all messages
+sent to a room in a given timeframe. There are various parameters available
+that allow for filtering and ordering the returned list. This API supports pagination.
+
+To use it, you will need to authenticate by providing an `access_token`
+for a server admin: see [Admin API](../usage/administration/admin_api).
+
+This endpoint mirrors the [Matrix Spec defined Messages API](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).
+
+The API is:
+```
+GET /_synapse/admin/v1/rooms/<room_id>/messages
+```
+
+**Parameters**
+
+The following path parameters are required:
+
+* `room_id` - The ID of the room you wish you fetch messages from.
+
+The following query parameters are available:
+
+* `from` (required) - The token to start returning events from. This token can be obtained from a prev_batch
+  or next_batch token returned by the /sync endpoint, or from an end token returned by a previous request to this endpoint.
+* `to` - The token to spot returning events at.
+* `limit` - The maximum number of events to return. Defaults to `10`.
+* `filter` - A JSON RoomEventFilter to filter returned events with.
+* `dir` - The direction to return events from. Either `f` for forwards or `b` for backwards. Setting
+  this value to `b` will reverse the above sort order. Defaults to `f`.
+
+**Response**
+
+The following fields are possible in the JSON response body:
+
+* `chunk` - A list of room events. The order depends on the dir parameter.
+          Note that an empty chunk does not necessarily imply that no more events are available. Clients should continue to paginate until no end property is returned.
+* `end` - A token corresponding to the end of chunk. This token can be passed back to this endpoint to request further events.
+          If no further events are available, this property is omitted from the response.
+* `start` - A token corresponding to the start of chunk.
+* `state` - A list of state events relevant to showing the chunk.
+
+**Example**
+
+For more details on each chunk, read [the Matrix specification](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).
+
+```json
+{
+  "chunk": [
+    {
+      "content": {
+        "body": "This is an example text message",
+        "format": "org.matrix.custom.html",
+        "formatted_body": "<b>This is an example text message</b>",
+        "msgtype": "m.text"
+      },
+      "event_id": "$143273582443PhrSn:example.org",
+      "origin_server_ts": 1432735824653,
+      "room_id": "!636q39766251:example.com",
+      "sender": "@example:example.org",
+      "type": "m.room.message",
+      "unsigned": {
+        "age": 1234
+      }
+    },
+    {
+      "content": {
+        "name": "The room name"
+      },
+      "event_id": "$143273582443PhrSn:example.org",
+      "origin_server_ts": 1432735824653,
+      "room_id": "!636q39766251:example.com",
+      "sender": "@example:example.org",
+      "state_key": "",
+      "type": "m.room.name",
+      "unsigned": {
+        "age": 1234
+      }
+    },
+    {
+      "content": {
+        "body": "Gangnam Style",
+        "info": {
+          "duration": 2140786,
+          "h": 320,
+          "mimetype": "video/mp4",
+          "size": 1563685,
+          "thumbnail_info": {
+            "h": 300,
+            "mimetype": "image/jpeg",
+            "size": 46144,
+            "w": 300
+          },
+          "thumbnail_url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe",
+          "w": 480
+        },
+        "msgtype": "m.video",
+        "url": "mxc://example.org/a526eYUSFFxlgbQYZmo442"
+      },
+      "event_id": "$143273582443PhrSn:example.org",
+      "origin_server_ts": 1432735824653,
+      "room_id": "!636q39766251:example.com",
+      "sender": "@example:example.org",
+      "type": "m.room.message",
+      "unsigned": {
+        "age": 1234
+      }
+    }
+  ],
+  "end": "t47409-4357353_219380_26003_2265",
+  "start": "t47429-4392820_219380_26003_2265"
+}
+```
+
+# Room Timestamp to Event API
+
+The Room Timestamp to Event API endpoint fetches the `event_id` of the closest event to the given
+timestamp (`ts` query parameter) in the given direction (`dir` query parameter).
+
+Useful for cases like jump to date so you can start paginating messages from
+a given date in the archive.
+
+The API is:
+```
+  GET /_synapse/admin/v1/rooms/<room_id>/timestamp_to_event
+```
+
+**Parameters**
+
+The following path parameters are required:
+
+* `room_id` - The ID of the room you wish to check.
+
+The following query parameters are available:
+
+* `ts` - a timestamp in milliseconds where we will find the closest event in
+  the given direction.
+* `dir` - can be `f` or `b` to indicate forwards and backwards in time from the
+  given timestamp. Defaults to `f`.
+
+**Response**
+
+* `event_id` - converted from timestamp
+
 # Block Room API
 The Block Room admin API allows server admins to block and unblock rooms,
 and query to see if a given room is blocked.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a0c39778ab..1f83bab836 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -26,6 +26,7 @@ from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.room import ShutdownRoomResponse
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.rest.admin._base import assert_user_is_admin
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, Requester, StreamKeyType
@@ -423,6 +424,7 @@ class PaginationHandler:
         pagin_config: PaginationConfig,
         as_client_event: bool = True,
         event_filter: Optional[Filter] = None,
+        use_admin_priviledge: bool = False,
     ) -> JsonDict:
         """Get messages in a room.
 
@@ -432,10 +434,16 @@ class PaginationHandler:
             pagin_config: The pagination config rules to apply, if any.
             as_client_event: True to get events in client-server format.
             event_filter: Filter to apply to results or None
+            use_admin_priviledge: if `True`, return all events, regardless
+                of whether `user` has access to them. To be used **ONLY**
+                from the admin API.
 
         Returns:
             Pagination API results
         """
+        if use_admin_priviledge:
+            await assert_user_is_admin(self.auth, requester)
+
         user_id = requester.user.to_string()
 
         if pagin_config.from_token:
@@ -458,12 +466,14 @@ class PaginationHandler:
         room_token = from_token.room_key
 
         async with self.pagination_lock.read(room_id):
-            (
-                membership,
-                member_event_id,
-            ) = await self.auth.check_user_in_room_or_world_readable(
-                room_id, requester, allow_departed_users=True
-            )
+            (membership, member_event_id) = (None, None)
+            if not use_admin_priviledge:
+                (
+                    membership,
+                    member_event_id,
+                ) = await self.auth.check_user_in_room_or_world_readable(
+                    room_id, requester, allow_departed_users=True
+                )
 
             if pagin_config.direction == "b":
                 # if we're going backwards, we might need to backfill. This
@@ -475,7 +485,7 @@ class PaginationHandler:
                         room_id, room_token.stream
                     )
 
-                if membership == Membership.LEAVE:
+                if not use_admin_priviledge and membership == Membership.LEAVE:
                     # If they have left the room then clamp the token to be before
                     # they left the room, to save the effort of loading from the
                     # database.
@@ -528,12 +538,13 @@ class PaginationHandler:
         if event_filter:
             events = await event_filter.filter(events)
 
-        events = await filter_events_for_client(
-            self._storage_controllers,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
+        if not use_admin_priviledge:
+            events = await filter_events_for_client(
+                self._storage_controllers,
+                user_id,
+                events,
+                is_peeking=(member_event_id is None),
+            )
 
         # if after the filter applied there are no more events
         # return immediately - but there might be more in next_token batch
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index fa3266720b..bac754e1b1 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -61,9 +61,11 @@ from synapse.rest.admin.rooms import (
     MakeRoomAdminRestServlet,
     RoomEventContextServlet,
     RoomMembersRestServlet,
+    RoomMessagesRestServlet,
     RoomRestServlet,
     RoomRestV2Servlet,
     RoomStateRestServlet,
+    RoomTimestampToEventRestServlet,
 )
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
 from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
@@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     DestinationResetConnectionRestServlet(hs).register(http_server)
     DestinationRestServlet(hs).register(http_server)
     ListDestinationsRestServlet(hs).register(http_server)
+    RoomMessagesRestServlet(hs).register(http_server)
+    RoomTimestampToEventRestServlet(hs).register(http_server)
 
     # Some servlets only get registered for the main process.
     if hs.config.worker.worker_app is None:
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 3d870629c4..747e6fda83 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -35,6 +35,7 @@ from synapse.rest.admin._base import (
 )
 from synapse.storage.databases.main.room import RoomSortOrder
 from synapse.storage.state import StateFilter
+from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, RoomID, UserID, create_requester
 from synapse.util import json_decoder
 
@@ -858,3 +859,106 @@ class BlockRoomRestServlet(RestServlet):
             await self._store.unblock_room(room_id)
 
         return HTTPStatus.OK, {"block": block}
+
+
+class RoomMessagesRestServlet(RestServlet):
+    """
+    Get messages list of a room.
+    """
+
+    PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
+
+    def __init__(self, hs: "HomeServer"):
+        self._hs = hs
+        self._clock = hs.get_clock()
+        self._pagination_handler = hs.get_pagination_handler()
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+
+    async def on_GET(
+        self, request: SynapseRequest, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester)
+
+        pagination_config = await PaginationConfig.from_request(
+            self._store, request, default_limit=10
+        )
+        # Twisted will have processed the args by now.
+        assert request.args is not None
+        as_client_event = b"raw" not in request.args
+        filter_str = parse_string(request, "filter", encoding="utf-8")
+        if filter_str:
+            filter_json = urlparse.unquote(filter_str)
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
+            if (
+                event_filter
+                and event_filter.filter_json.get("event_format", "client")
+                == "federation"
+            ):
+                as_client_event = False
+        else:
+            event_filter = None
+
+        msgs = await self._pagination_handler.get_messages(
+            room_id=room_id,
+            requester=requester,
+            pagin_config=pagination_config,
+            as_client_event=as_client_event,
+            event_filter=event_filter,
+            use_admin_priviledge=True,
+        )
+
+        return HTTPStatus.OK, msgs
+
+
+class RoomTimestampToEventRestServlet(RestServlet):
+    """
+    API endpoint to fetch the `event_id` of the closest event to the given
+    timestamp (`ts` query parameter) in the given direction (`dir` query
+    parameter).
+
+    Useful for cases like jump to date so you can start paginating messages from
+    a given date in the archive.
+
+    `ts` is a timestamp in milliseconds where we will find the closest event in
+    the given direction.
+
+    `dir` can be `f` or `b` to indicate forwards and backwards in time from the
+    given timestamp.
+
+    GET /_synapse/admin/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
+    {
+        "event_id": ...
+    }
+    """
+
+    PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+        self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler()
+
+    async def on_GET(
+        self, request: SynapseRequest, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester)
+
+        timestamp = parse_integer(request, "ts", required=True)
+        direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
+
+        (
+            event_id,
+            origin_server_ts,
+        ) = await self._timestamp_lookup_handler.get_event_for_timestamp(
+            requester, room_id, timestamp, direction
+        )
+
+        return HTTPStatus.OK, {
+            "event_id": event_id,
+            "origin_server_ts": origin_server_ts,
+        }
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 9d71a97524..d156be82b0 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -11,6 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import json
+import time
 import urllib.parse
 from typing import List, Optional
 from unittest.mock import Mock
@@ -22,10 +24,11 @@ from twisted.test.proto_helpers import MemoryReactor
 import synapse.rest.admin
 from synapse.api.constants import EventTypes, Membership, RoomTypes
 from synapse.api.errors import Codes
-from synapse.handlers.pagination import PaginationHandler
+from synapse.handlers.pagination import PaginationHandler, PurgeStatus
 from synapse.rest.client import directory, events, login, room
 from synapse.server import HomeServer
 from synapse.util import Clock
+from synapse.util.stringutils import random_string
 
 from tests import unittest
 
@@ -1793,6 +1796,159 @@ class RoomTestCase(unittest.HomeserverTestCase):
         self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
 
 
+class RoomMessagesTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+        self.user = self.register_user("foo", "pass")
+        self.user_tok = self.login("foo", "pass")
+        self.room_id = self.helper.create_room_as(self.user, tok=self.user_tok)
+
+    def test_timestamp_to_event(self) -> None:
+        """Test that providing the current timestamp can get the last event."""
+        self.helper.send(self.room_id, body="message 1", tok=self.user_tok)
+        second_event_id = self.helper.send(
+            self.room_id, body="message 2", tok=self.user_tok
+        )["event_id"]
+        ts = str(round(time.time() * 1000))
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/timestamp_to_event?dir=b&ts=%s"
+            % (self.room_id, ts),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, channel.code)
+        self.assertIn("event_id", channel.json_body)
+        self.assertEqual(second_event_id, channel.json_body["event_id"])
+
+    def test_topo_token_is_accepted(self) -> None:
+        """Test Topo Token is accepted."""
+        token = "t1-0_0_0_0_0_0_0_0_0"
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, channel.code)
+        self.assertIn("start", channel.json_body)
+        self.assertEqual(token, channel.json_body["start"])
+        self.assertIn("chunk", channel.json_body)
+        self.assertIn("end", channel.json_body)
+
+    def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
+        """Test that stream token is accepted for forward pagination."""
+        token = "s0_0_0_0_0_0_0_0_0"
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, channel.code)
+        self.assertIn("start", channel.json_body)
+        self.assertEqual(token, channel.json_body["start"])
+        self.assertIn("chunk", channel.json_body)
+        self.assertIn("end", channel.json_body)
+
+    def test_room_messages_purge(self) -> None:
+        """Test room messages can be retrieved by an admin that isn't in the room."""
+        store = self.hs.get_datastores().main
+        pagination_handler = self.hs.get_pagination_handler()
+
+        # Send a first message in the room, which will be removed by the purge.
+        first_event_id = self.helper.send(
+            self.room_id, body="message 1", tok=self.user_tok
+        )["event_id"]
+        first_token = self.get_success(
+            store.get_topological_token_for_event(first_event_id)
+        )
+        first_token_str = self.get_success(first_token.to_string(store))
+
+        # Send a second message in the room, which won't be removed, and which we'll
+        # use as the marker to purge events before.
+        second_event_id = self.helper.send(
+            self.room_id, body="message 2", tok=self.user_tok
+        )["event_id"]
+        second_token = self.get_success(
+            store.get_topological_token_for_event(second_event_id)
+        )
+        second_token_str = self.get_success(second_token.to_string(store))
+
+        # Send a third event in the room to ensure we don't fall under any edge case
+        # due to our marker being the latest forward extremity in the room.
+        self.helper.send(self.room_id, body="message 3", tok=self.user_tok)
+
+        # Check that we get the first and second message when querying /messages.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
+            % (
+                self.room_id,
+                second_token_str,
+                json.dumps({"types": [EventTypes.Message]}),
+            ),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
+
+        # Purge every event before the second event.
+        purge_id = random_string(16)
+        pagination_handler._purges_by_id[purge_id] = PurgeStatus()
+        self.get_success(
+            pagination_handler._purge_history(
+                purge_id=purge_id,
+                room_id=self.room_id,
+                token=second_token_str,
+                delete_local_events=True,
+            )
+        )
+
+        # Check that we only get the second message through /message now that the first
+        # has been purged.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
+            % (
+                self.room_id,
+                second_token_str,
+                json.dumps({"types": [EventTypes.Message]}),
+            ),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 1, [event["content"] for event in chunk])
+
+        # Check that we get no event, but also no error, when querying /messages with
+        # the token that was pointing at the first event, because we don't have it
+        # anymore.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
+            % (
+                self.room_id,
+                first_token_str,
+                json.dumps({"types": [EventTypes.Message]}),
+            ),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 0, [event["content"] for event in chunk])
+
+
 class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
 
     servlets = [