From ee7a1cabd8c6d218b838295fde6999dcbc23036b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 16 Jan 2018 13:04:01 +0000 Subject: document metrics changes --- docs/metrics-howto.rst | 61 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 7 deletions(-) (limited to 'docs') diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index 143cd0f42f..8acc479bc3 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -16,7 +16,7 @@ How to monitor Synapse metrics using Prometheus metrics_port: 9092 Also ensure that ``enable_metrics`` is set to ``True``. - + Restart synapse. 3. Add a prometheus target for synapse. @@ -28,11 +28,58 @@ How to monitor Synapse metrics using Prometheus static_configs: - targets: ["my.server.here:9092"] - If your prometheus is older than 1.5.2, you will need to replace + If your prometheus is older than 1.5.2, you will need to replace ``static_configs`` in the above with ``target_groups``. - + Restart prometheus. + +Block and response metrics renamed for 0.27.0 +--------------------------------------------- + +Synapse 0.27.0 begins the process of rationalising the duplicate ``*:count`` +metrics reported for the resource tracking for code blocks and HTTP requests. + +At the same time, the corresponding ``*:total`` metrics are being renamed, as +the ``:total`` suffix no longer makes sense in the absence of a corresponding +``:count`` metric. + +To enable a graceful migration path, this release just adds new names for the +metrics being renamed. A future release will remove the old ones. + +The following table shows the new metrics, and the old metrics which they are +replacing. + +==================================================== =================================================== +New name Old name +==================================================== =================================================== +synapse_util_metrics_block_count synapse_util_metrics_block_timer:count +synapse_util_metrics_block_count synapse_util_metrics_block_ru_utime:count +synapse_util_metrics_block_count synapse_util_metrics_block_ru_stime:count +synapse_util_metrics_block_count synapse_util_metrics_block_db_txn_count:count +synapse_util_metrics_block_count synapse_util_metrics_block_db_txn_duration:count + +synapse_util_metrics_block_time_seconds synapse_util_metrics_block_timer:total +synapse_util_metrics_block_ru_utime_seconds synapse_util_metrics_block_ru_utime:total +synapse_util_metrics_block_ru_stime_seconds synapse_util_metrics_block_ru_stime:total +synapse_util_metrics_block_db_txn_count synapse_util_metrics_block_db_txn_count:total +synapse_util_metrics_block_db_txn_duration_seconds synapse_util_metrics_block_db_txn_duration:total + +synapse_http_server_response_count synapse_http_server_requests +synapse_http_server_response_count synapse_http_server_response_time:count +synapse_http_server_response_count synapse_http_server_response_ru_utime:count +synapse_http_server_response_count synapse_http_server_response_ru_stime:count +synapse_http_server_response_count synapse_http_server_response_db_txn_count:count +synapse_http_server_response_count synapse_http_server_response_db_txn_duration:count + +synapse_http_server_response_time_seconds synapse_http_server_response_time:total +synapse_http_server_response_ru_utime_seconds synapse_http_server_response_ru_utime:total +synapse_http_server_response_ru_stime_seconds synapse_http_server_response_ru_stime:total +synapse_http_server_response_db_txn_count synapse_http_server_response_db_txn_count:total +synapse_http_server_response_db_txn_duration_seconds synapse_http_server_response_db_txn_duration:total +==================================================== =================================================== + + Standard Metric Names --------------------- @@ -42,7 +89,7 @@ have been changed to seconds, from miliseconds. ================================== ============================= New name Old name ----------------------------------- ----------------------------- +================================== ============================= process_cpu_user_seconds_total process_resource_utime / 1000 process_cpu_system_seconds_total process_resource_stime / 1000 process_open_fds (no 'type' label) process_fds @@ -52,8 +99,8 @@ The python-specific counts of garbage collector performance have been renamed. =========================== ====================== New name Old name ---------------------------- ---------------------- -python_gc_time reactor_gc_time +=========================== ====================== +python_gc_time reactor_gc_time python_gc_unreachable_total reactor_gc_unreachable python_gc_counts reactor_gc_counts =========================== ====================== @@ -62,7 +109,7 @@ The twisted-specific reactor metrics have been renamed. ==================================== ===================== New name Old name ------------------------------------- --------------------- +==================================== ===================== python_twisted_reactor_pending_calls reactor_pending_calls python_twisted_reactor_tick_time reactor_tick_time ==================================== ===================== -- cgit 1.5.1 From 3af53c183a0ab5d30ce0fb40e9b8eee8da7ad75a Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Wed, 31 Jan 2018 08:15:59 -0700 Subject: Add admin api documentation for list media endpoint Signed-off-by: Travis Ralston --- docs/admin_api/media_admin_api.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 docs/admin_api/media_admin_api.md (limited to 'docs') diff --git a/docs/admin_api/media_admin_api.md b/docs/admin_api/media_admin_api.md new file mode 100644 index 0000000000..abdbc1ea86 --- /dev/null +++ b/docs/admin_api/media_admin_api.md @@ -0,0 +1,23 @@ +# List all media in a room + +This API gets a list of known media in a room. + +The API is: +``` +GET /_matrix/client/r0/admin/room//media +``` +including an `access_token` of a server admin. + +It returns a JSON body like the following: +``` +{ + "local": [ + "mxc://localhost/xwvutsrqponmlkjihgfedcba", + "mxc://localhost/abcdefghijklmnopqrstuvwx" + ], + "remote": [ + "mxc://matrix.org/xwvutsrqponmlkjihgfedcba", + "mxc://matrix.org/abcdefghijklmnopqrstuvwx" + ] +} +``` -- cgit 1.5.1 From f133228cb35b7803910688e7060772cb9e64f01a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Feb 2018 17:23:13 +0000 Subject: Add note in docs/workers.rst --- docs/workers.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'docs') diff --git a/docs/workers.rst b/docs/workers.rst index b39f79058e..213d57e47c 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -207,3 +207,14 @@ the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration file. For example:: worker_main_http_uri: http://127.0.0.1:8008 + + +``synapse.app.event_creator`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Handles non-state event creation. It can handle REST endpoints matching: + + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send + +It will create events locally and then send them on to the main synapse +instance to be persisted and handled. -- cgit 1.5.1 From 74fcbf741b3a7b95b5cc44478050e8a40fb7dc46 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Feb 2018 18:44:52 +0000 Subject: delete_local_events for purge_history Add a flag which makes the purger delete local events --- docs/admin_api/purge_history_api.rst | 14 ++++++++++++-- synapse/handlers/message.py | 4 ++-- synapse/http/servlet.py | 18 +++++++++++++++--- synapse/rest/client/v1/admin.py | 11 ++++++++++- synapse/storage/events.py | 35 ++++++++++++++++++++++++++++------- 5 files changed, 67 insertions(+), 15 deletions(-) (limited to 'docs') diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index 08b3306366..b4e5bd9d75 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -4,8 +4,6 @@ Purge History API The purge history API allows server admins to purge historic events from their database, reclaiming disk space. -**NB!** This will not delete local events (locally sent messages content etc) from the database, but will remove lots of the metadata about them and does dramatically reduce the on disk space usage - Depending on the amount of history being purged a call to the API may take several minutes or longer. During this period users will not be able to paginate further back in the room from the point being purged from. @@ -15,3 +13,15 @@ The API is simply: ``POST /_matrix/client/r0/admin/purge_history//`` including an ``access_token`` of a server admin. + +By default, events sent by local users are not deleted, as they may represent +the only copies of this content in existence. (Events sent by remote users are +deleted, and room state data before the cutoff is always removed). + +To delete local events as well, set ``delete_local_events`` in the body: + +.. code:: json + + { + "delete_local_events": True, + } diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c7860bb05..276d1a7722 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -63,7 +63,7 @@ class MessageHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() @defer.inlineCallbacks - def purge_history(self, room_id, event_id): + def purge_history(self, room_id, event_id, delete_local_events=False): event = yield self.store.get_event(event_id) if event.room_id != room_id: @@ -72,7 +72,7 @@ class MessageHandler(BaseHandler): depth = event.depth with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history(room_id, depth) + yield self.store.purge_history(room_id, depth, delete_local_events) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 71420e54db..ef8e62901b 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -148,11 +148,13 @@ def parse_string_from_args(args, name, default=None, required=False, return default -def parse_json_value_from_request(request): +def parse_json_value_from_request(request, allow_empty_body=False): """Parse a JSON value from the body of a twisted HTTP request. Args: request: the twisted HTTP request. + allow_empty_body (bool): if True, an empty body will be accepted and + turned into None Returns: The JSON value. @@ -165,6 +167,9 @@ def parse_json_value_from_request(request): except Exception: raise SynapseError(400, "Error reading JSON content.") + if not content_bytes and allow_empty_body: + return None + try: content = simplejson.loads(content_bytes) except Exception as e: @@ -174,17 +179,24 @@ def parse_json_value_from_request(request): return content -def parse_json_object_from_request(request): +def parse_json_object_from_request(request, allow_empty_body=False): """Parse a JSON object from the body of a twisted HTTP request. Args: request: the twisted HTTP request. + allow_empty_body (bool): if True, an empty body will be accepted and + turned into an empty dict. Raises: SynapseError if the request body couldn't be decoded as JSON or if it wasn't a JSON object. """ - content = parse_json_value_from_request(request) + content = parse_json_value_from_request( + request, allow_empty_body=allow_empty_body, + ) + + if allow_empty_body and content is None: + return {} if type(content) != dict: message = "Content must be a JSON object." diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 5022808ea9..f954d2ea65 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -128,7 +128,16 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self.handlers.message_handler.purge_history(room_id, event_id) + body = parse_json_object_from_request(request, allow_empty_body=True) + + delete_local_events = bool( + body.get("delete_local_history", False) + ) + + yield self.handlers.message_handler.purge_history( + room_id, event_id, + delete_local_events=delete_local_events, + ) defer.returnValue((200, {})) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 24d9978304..11a2ff2d8a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2031,16 +2031,32 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def purge_history(self, room_id, topological_ordering): + def purge_history( + self, room_id, topological_ordering, delete_local_events, + ): """Deletes room history before a certain point + + Args: + room_id (str): + + topological_ordering (int): + minimum topo ordering to preserve + + delete_local_events (bool): + if True, we will delete local events as well as remote ones + (instead of just marking them as outliers and deleting their + state groups). """ return self.runInteraction( "purge_history", - self._purge_history_txn, room_id, topological_ordering + self._purge_history_txn, room_id, topological_ordering, + delete_local_events, ) - def _purge_history_txn(self, txn, room_id, topological_ordering): + def _purge_history_txn( + self, txn, room_id, topological_ordering, delete_local_events, + ): # Tables that should be pruned: # event_auth # event_backward_extremities @@ -2093,11 +2109,14 @@ class EventsStore(SQLBaseStore): to_delete = [ (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) + if state_key is None and ( + delete_local_events or not self.hs.is_mine_id(event_id) + ) ] logger.info( - "[purge] found %i events before cutoff, of which %i are remote" - " non-state events to delete", len(event_rows), len(to_delete)) + "[purge] found %i events before cutoff, of which %i can be deleted", + len(event_rows), len(to_delete), + ) logger.info("[purge] Finding new backward extremities") @@ -2273,7 +2292,9 @@ class EventsStore(SQLBaseStore): " WHERE event_id = ?", [ (True, event_id,) for event_id, state_key in event_rows - if state_key is not None or self.hs.is_mine_id(event_id) + if state_key is not None or ( + not delete_local_events and self.hs.is_mine_id(event_id) + ) ] ) -- cgit 1.5.1 From 32c7b8e48b5c79de5b722afb4c2b79c6c712cdc5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Feb 2018 17:18:07 +0000 Subject: Update workers docs to include http port --- docs/workers.rst | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) (limited to 'docs') diff --git a/docs/workers.rst b/docs/workers.rst index 213d57e47c..b687807e59 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -30,17 +30,29 @@ requests made to the federation port. The caveats regarding running a reverse-proxy on the federation port still apply (see https://github.com/matrix-org/synapse/blob/master/README.rst#reverse-proxying-the-federation-port). -To enable workers, you need to add a replication listener to the master synapse, e.g.:: +To enable workers, you need to add two replication listeners to the master +synapse, e.g.:: listeners: + # The TCP replication port - port: 9092 bind_address: '127.0.0.1' type: replication + # The HTTP replication port + - port: 9093 + bind_address: '127.0.0.1' + type: http + resources: + - names: [replication] -Under **no circumstances** should this replication API listener be exposed to the -public internet; it currently implements no authentication whatsoever and is +Under **no circumstances** should these replication API listeners be exposed to +the public internet; it currently implements no authentication whatsoever and is unencrypted. +(Roughly, the TCP port is used for streaming data from the master to the +workers, and the HTTP port for the workers to communicate with the main +synapse process.) + You then create a set of configs for the various worker processes. These should be worker configuration files, and should be stored in a dedicated subdirectory, to allow synctl to manipulate them. @@ -52,8 +64,10 @@ You should minimise the number of overrides though to maintain a usable config. You must specify the type of worker application (``worker_app``). The currently available worker applications are listed below. You must also specify the -replication endpoint that it's talking to on the main synapse process -(``worker_replication_host`` and ``worker_replication_port``). +replication endpoints that it's talking to on the main synapse process. +``worker_replication_host`` should specify the host of the main synapse, +``worker_replication_port`` should point to the TCP replication listener port and +``worker_replication_http_port`` should point to the HTTP replication port. For instance:: @@ -62,6 +76,7 @@ For instance:: # The replication listener on the synapse to talk to. worker_replication_host: 127.0.0.1 worker_replication_port: 9092 + worker_replication_http_port: 9093 worker_listeners: - type: http -- cgit 1.5.1 From 8fd1a324564510be55a7c1e6b6339f736f5c525a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 13 Feb 2018 13:04:41 +0000 Subject: Fix typos in purge api & doc * It's supposed to be purge_local_events, not ..._history * Fix the doc to have valid json --- docs/admin_api/purge_history_api.rst | 2 +- synapse/rest/client/v1/admin.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) (limited to 'docs') diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index b4e5bd9d75..a3a17e9f9f 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -23,5 +23,5 @@ To delete local events as well, set ``delete_local_events`` in the body: .. code:: json { - "delete_local_events": True, + "delete_local_events": true } diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 2ad486c67d..6073cc6fa2 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -131,9 +131,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): body = parse_json_object_from_request(request, allow_empty_body=True) - delete_local_events = bool( - body.get("delete_local_history", False) - ) + delete_local_events = bool(body.get("delete_local_events", False)) yield self.handlers.message_handler.purge_history( room_id, event_id, -- cgit 1.5.1 From 059d3a6c8e55ab7e5318793b8d7c4546bb850d33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Feb 2018 17:53:56 +0000 Subject: Update docs --- docs/workers.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'docs') diff --git a/docs/workers.rst b/docs/workers.rst index b687807e59..dee04bbf3e 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -50,7 +50,7 @@ the public internet; it currently implements no authentication whatsoever and is unencrypted. (Roughly, the TCP port is used for streaming data from the master to the -workers, and the HTTP port for the workers to communicate with the main +workers, and the HTTP port for the workers to send data to the main synapse process.) You then create a set of configs for the various worker processes. These @@ -69,6 +69,9 @@ replication endpoints that it's talking to on the main synapse process. ``worker_replication_port`` should point to the TCP replication listener port and ``worker_replication_http_port`` should point to the HTTP replication port. +Currently, only the ``event_creator`` worker requires specifying +``worker_replication_http_port``. + For instance:: worker_app: synapse.app.synchrotron -- cgit 1.5.1 From f8bfcd7e0d2fc6399eb654a41773cd603b4037fc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 1 Mar 2018 23:20:54 +0000 Subject: Provide a means to pass a timestamp to purge_history --- docs/admin_api/purge_history_api.rst | 11 +++++-- synapse/handlers/message.py | 14 ++++----- synapse/rest/client/v1/admin.py | 58 ++++++++++++++++++++++++++++++++++-- synapse/storage/stream.py | 27 +++++++++++++++++ 4 files changed, 96 insertions(+), 14 deletions(-) (limited to 'docs') diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index a3a17e9f9f..acf1bc5749 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -8,9 +8,9 @@ Depending on the amount of history being purged a call to the API may take several minutes or longer. During this period users will not be able to paginate further back in the room from the point being purged from. -The API is simply: +The API is: -``POST /_matrix/client/r0/admin/purge_history//`` +``POST /_matrix/client/r0/admin/purge_history/[/]`` including an ``access_token`` of a server admin. @@ -25,3 +25,10 @@ To delete local events as well, set ``delete_local_events`` in the body: { "delete_local_events": true } + +The caller must specify the point in the room to purge up to. This can be +specified by including an event_id in the URI, or by setting a +``purge_up_to_event_id`` or ``purge_up_to_ts`` in the request body. If an event +id is given, that event (and others at the same graph depth) will be retained. +If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch, +in milliseconds. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d28c2745c..dd00d8a86c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -52,16 +52,12 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() @defer.inlineCallbacks - def purge_history(self, room_id, event_id, delete_local_events=False): - event = yield self.store.get_event(event_id) - - if event.room_id != room_id: - raise SynapseError(400, "Event is for wrong room.") - - depth = event.depth - + def purge_history(self, room_id, topological_ordering, + delete_local_events=False): with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history(room_id, depth, delete_local_events) + yield self.store.purge_history( + room_id, topological_ordering, delete_local_events, + ) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 3917eee42d..dcf6215dad 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import AuthError, SynapseError +from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request @@ -114,12 +114,18 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet): class PurgeHistoryRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns( - "/admin/purge_history/(?P[^/]*)/(?P[^/]*)" + "/admin/purge_history/(?P[^/]*)(/(?P[^/]+))?" ) def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer) + """ super(PurgeHistoryRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.store = hs.get_datastore() @defer.inlineCallbacks def on_POST(self, request, room_id, event_id): @@ -133,8 +139,54 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): delete_local_events = bool(body.get("delete_local_events", False)) + # establish the topological ordering we should keep events from. The + # user can provide an event_id in the URL or the request body, or can + # provide a timestamp in the request body. + if event_id is None: + event_id = body.get('purge_up_to_event_id') + + if event_id is not None: + event = yield self.store.get_event(event_id) + + if event.room_id != room_id: + raise SynapseError(400, "Event is for wrong room.") + + depth = event.depth + logger.info( + "[purge] purging up to depth %i (event_id %s)", + depth, event_id, + ) + elif 'purge_up_to_ts' in body: + ts = body['purge_up_to_ts'] + if not isinstance(ts, int): + raise SynapseError( + 400, "purge_up_to_ts must be an int", + errcode=Codes.BAD_JSON, + ) + + stream_ordering = ( + yield self.store.find_first_stream_ordering_after_ts(ts) + ) + + (_, depth, _) = ( + yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering, + ) + ) + logger.info( + "[purge] purging up to depth %i (received_ts %i => " + "stream_ordering %i)", + depth, ts, stream_ordering, + ) + else: + raise SynapseError( + 400, + "must specify purge_up_to_event_id or purge_up_to_ts", + errcode=Codes.BAD_JSON, + ) + yield self.handlers.message_handler.purge_history( - room_id, event_id, + room_id, depth, delete_local_events=delete_local_events, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a2527d2a36..515a04699a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -416,6 +416,33 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + def get_room_event_after_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or after a stream ordering + + Args: + room_id (str): + stream_ordering (int): + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + def _f(txn): + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering >= ?" + " AND NOT outlier" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + txn.execute(sql, (room_id, stream_ordering, )) + return txn.fetchone() + + return self.runInteraction( + "get_room_event_after_stream_ordering", _f, + ) + @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): """Returns the current token for rooms stream. -- cgit 1.5.1 From 20f40348d4ea55cc5b98528673e26bac7396a3cb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 7 Mar 2018 19:59:24 +0000 Subject: Factor run_in_background out from preserve_fn It annoys me that we create temporary function objects when there's really no need for it. Let's factor the gubbins out of preserve_fn and start using it. --- docs/log_contexts.rst | 8 +++---- synapse/util/logcontext.py | 53 +++++++++++++++++++++++++--------------------- 2 files changed, 33 insertions(+), 28 deletions(-) (limited to 'docs') diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index b19b7fa1ea..82ac4f91e5 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -279,9 +279,9 @@ Obviously that option means that the operations done in that might be fixed by setting a different logcontext via a ``with LoggingContext(...)`` in ``background_operation``). -The second option is to use ``logcontext.preserve_fn``, which wraps a function -so that it doesn't reset the logcontext even when it returns an incomplete -deferred, and adds a callback to the returned deferred to reset the +The second option is to use ``logcontext.run_in_background``, which wraps a +function so that it doesn't reset the logcontext even when it returns an +incomplete deferred, and adds a callback to the returned deferred to reset the logcontext. In other words, it turns a function that follows the Synapse rules about logcontexts and Deferreds into one which behaves more like an external function — the opposite operation to that described in the previous section. @@ -293,7 +293,7 @@ It can be used like this: def do_request_handling(): yield foreground_operation() - logcontext.preserve_fn(background_operation)() + logcontext.run_in_background(background_operation) # this will now be logged against the request context logger.debug("Request handling complete") diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a8dea15c1b..d660ec785b 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -292,36 +292,41 @@ class PreserveLoggingContext(object): def preserve_fn(f): - """Wraps a function, to ensure that the current context is restored after + """Function decorator which wraps the function with run_in_background""" + def g(*args, **kwargs): + return run_in_background(f, *args, **kwargs) + return g + + +def run_in_background(f, *args, **kwargs): + """Calls a function, ensuring that the current context is restored after return from the function, and that the sentinel context is set once the deferred returned by the funtion completes. Useful for wrapping functions that return a deferred which you don't yield on. """ - def g(*args, **kwargs): - current = LoggingContext.current_context() - res = f(*args, **kwargs) - if isinstance(res, defer.Deferred) and not res.called: - # The function will have reset the context before returning, so - # we need to restore it now. - LoggingContext.set_current_context(current) - - # The original context will be restored when the deferred - # completes, but there is nothing waiting for it, so it will - # get leaked into the reactor or some other function which - # wasn't expecting it. We therefore need to reset the context - # here. - # - # (If this feels asymmetric, consider it this way: we are - # effectively forking a new thread of execution. We are - # probably currently within a ``with LoggingContext()`` block, - # which is supposed to have a single entry and exit point. But - # by spawning off another deferred, we are effectively - # adding a new exit point.) - res.addBoth(_set_context_cb, LoggingContext.sentinel) - return res - return g + current = LoggingContext.current_context() + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred) and not res.called: + # The function will have reset the context before returning, so + # we need to restore it now. + LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(_set_context_cb, LoggingContext.sentinel) + return res def make_deferred_yieldable(deferred): -- cgit 1.5.1 From e48c7aac4d827b66182adf80ab9804f42db186c9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Mar 2018 11:47:28 +0000 Subject: Add transactional API to history purge Make the purge request return quickly, and allow scripts to poll for updates. --- docs/admin_api/purge_history_api.rst | 27 +++++++++ synapse/handlers/message.py | 104 +++++++++++++++++++++++++++++++++-- synapse/rest/client/v1/admin.py | 38 ++++++++++++- 3 files changed, 161 insertions(+), 8 deletions(-) (limited to 'docs') diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index acf1bc5749..ea2922da5c 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -32,3 +32,30 @@ specified by including an event_id in the URI, or by setting a id is given, that event (and others at the same graph depth) will be retained. If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch, in milliseconds. + +The API starts the purge running, and returns immediately with a JSON body with +a purge id: + +.. code:: json + + { + "purge_id": "" + } + +Purge status query +------------------ + +It is possible to poll for updates on recent purges with a second API; + +``GET /_matrix/client/r0/admin/purge_history_status/`` + +(again, with a suitable ``access_token``). This API returns a JSON body like +the following: + +.. code:: json + + { + "status": "active" + } + +The status will be one of ``active``, ``complete``, or ``failed``. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6eb8d19dc9..42aab91c50 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -13,7 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError @@ -24,9 +25,10 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, run_in_background from synapse.util.metrics import measure_func from synapse.util.frozenutils import unfreeze +from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client from synapse.replication.http.send_event import send_event_to_master @@ -41,6 +43,36 @@ import ujson logger = logging.getLogger(__name__) +class PurgeStatus(object): + """Object tracking the status of a purge request + + This class contains information on the progress of a purge request, for + return by get_purge_status. + + Attributes: + status (int): Tracks whether this request has completed. One of + STATUS_{ACTIVE,COMPLETE,FAILED} + """ + + STATUS_ACTIVE = 0 + STATUS_COMPLETE = 1 + STATUS_FAILED = 2 + + STATUS_TEXT = { + STATUS_ACTIVE: "active", + STATUS_COMPLETE: "complete", + STATUS_FAILED: "failed", + } + + def __init__(self): + self.status = PurgeStatus.STATUS_ACTIVE + + def asdict(self): + return { + "status": PurgeStatus.STATUS_TEXT[self.status] + } + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -51,25 +83,87 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() self._purges_in_progress_by_room = set() + # map from purge id to PurgeStatus + self._purges_by_id = {} - @defer.inlineCallbacks - def purge_history(self, room_id, topological_ordering, - delete_local_events=False): + def start_purge_history(self, room_id, topological_ordering, + delete_local_events=False): + """Start off a history purge on a room. + + Args: + room_id (str): The room to purge from + + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + str: unique ID for this purge transaction. + """ if room_id in self._purges_in_progress_by_room: raise SynapseError( 400, "History purge already in progress for %s" % (room_id, ), ) + purge_id = random_string(16) + + # we log the purge_id here so that it can be tied back to the + # request id in the log lines. + logger.info("[purge] starting purge_id %s", purge_id) + + self._purges_by_id[purge_id] = PurgeStatus() + run_in_background( + self._purge_history, + purge_id, room_id, topological_ordering, delete_local_events, + ) + return purge_id + + @defer.inlineCallbacks + def _purge_history(self, purge_id, room_id, topological_ordering, + delete_local_events): + """Carry out a history purge on a room. + + Args: + purge_id (str): The id for this purge + room_id (str): The room to purge from + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + Deferred + """ self._purges_in_progress_by_room.add(room_id) try: with (yield self.pagination_lock.write(room_id)): yield self.store.purge_history( room_id, topological_ordering, delete_local_events, ) + logger.info("[purge] complete") + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + except Exception: + logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED finally: self._purges_in_progress_by_room.discard(room_id) + # remove the purge from the list 24 hours after it completes + def clear_purge(): + del self._purges_by_id[purge_id] + reactor.callLater(24 * 3600, clear_purge) + + def get_purge_status(self, purge_id): + """Get the current status of an active purge + + Args: + purge_id (str): purge_id returned by start_purge_history + + Returns: + PurgeStatus|None + """ + return self._purges_by_id.get(purge_id) + @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True, event_filter=None): diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index dcf6215dad..303419d281 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request @@ -185,12 +185,43 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): errcode=Codes.BAD_JSON, ) - yield self.handlers.message_handler.purge_history( + purge_id = yield self.handlers.message_handler.start_purge_history( room_id, depth, delete_local_events=delete_local_events, ) - defer.returnValue((200, {})) + defer.returnValue((200, { + "purge_id": purge_id, + })) + + +class PurgeHistoryStatusRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns( + "/admin/purge_history_status/(?P[^/]+)" + ) + + def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer) + """ + super(PurgeHistoryStatusRestServlet, self).__init__(hs) + self.handlers = hs.get_handlers() + + @defer.inlineCallbacks + def on_GET(self, request, purge_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + + if not is_admin: + raise AuthError(403, "You are not a server admin") + + purge_status = self.handlers.message_handler.get_purge_status(purge_id) + if purge_status is None: + raise NotFoundError("purge id '%s' not found" % purge_id) + + defer.returnValue((200, purge_status.asdict())) class DeactivateAccountRestServlet(ClientV1RestServlet): @@ -561,6 +592,7 @@ class SearchUsersRestServlet(ClientV1RestServlet): def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) PurgeMediaCacheRestServlet(hs).register(http_server) + PurgeHistoryStatusRestServlet(hs).register(http_server) DeactivateAccountRestServlet(hs).register(http_server) PurgeHistoryRestServlet(hs).register(http_server) UsersRestServlet(hs).register(http_server) -- cgit 1.5.1 From c33c1ceddd5da8195b38059dce31255209075ba2 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Wed, 14 Mar 2018 11:09:08 -0600 Subject: OCD: Make the event_creator routes regex a code block All the others are code blocks, so this one should be to (currently it is a blockquote). Signed-off-by: Travis Ralston --- docs/workers.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'docs') diff --git a/docs/workers.rst b/docs/workers.rst index dee04bbf3e..80f8d2181a 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -230,7 +230,7 @@ file. For example:: ``synapse.app.event_creator`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Handles non-state event creation. It can handle REST endpoints matching: +Handles non-state event creation. It can handle REST endpoints matching:: ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send -- cgit 1.5.1 From 0ad5125814dc18a79423740ac54f96e16a427758 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 15 Mar 2018 11:05:42 +0000 Subject: Update purge_history_api.rst clarify that `purge_history` will not purge state --- docs/admin_api/purge_history_api.rst | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'docs') diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index ea2922da5c..2da833c827 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -16,9 +16,11 @@ including an ``access_token`` of a server admin. By default, events sent by local users are not deleted, as they may represent the only copies of this content in existence. (Events sent by remote users are -deleted, and room state data before the cutoff is always removed). +deleted.) -To delete local events as well, set ``delete_local_events`` in the body: +Room state data (such as joins, leaves, topic) is always preserved. + +To delete local message events as well, set ``delete_local_events`` in the body: .. code:: json -- cgit 1.5.1