From cfe1ff4bdb9296ff2a7dc167dabf4397f81634f7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 16:33:05 +0100 Subject: Add a replication endpoint for deleting pushers --- synapse/replication/resource.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a543af68f8..e5c9a53929 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -15,6 +15,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request +from synapse.replication.pusher_resource import PusherResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -102,8 +103,6 @@ class ReplicationResource(Resource): long-polling this replication API for new data on those streams. """ - isLeaf = True - def __init__(self, hs): Resource.__init__(self) # Resource is old-style, so no super() @@ -114,6 +113,8 @@ class ReplicationResource(Resource): self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier + self.putChild("remove_pushers", PusherResource(hs)) + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -343,7 +344,7 @@ class ReplicationResource(Resource): "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" )) - writer.write_header_and_rows("deleted", deleted, ( + writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" )) -- cgit 1.4.1 From 8a656664544fbc23db618aa855cc61ac54d9afeb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 27 Apr 2016 15:38:43 +0100 Subject: Fix backfill replication to advance the stream correctly --- synapse/replication/resource.py | 2 +- synapse/replication/slave/storage/events.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index e5c9a53929..149fc4c650 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -382,7 +382,7 @@ class _Writer(object): position = rows[-1][0] self.streams[name] = { - "position": str(position), + "position": position if type(position) is int else str(position), "field_names": fields, "rows": rows, } diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5f37ba6995..86f00b6ff5 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -118,7 +118,7 @@ class SlavedEventStore(BaseSlavedStore): def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() - result["backfill"] = self._backfill_id_gen.get_current_token() + result["backfill"] = -self._backfill_id_gen.get_current_token() return result def process_replication(self, result): @@ -136,7 +136,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("backfill") if stream: - self._backfill_id_gen.advance(stream["position"]) + self._backfill_id_gen.advance(-stream["position"]) for row in stream["rows"]: self._process_replication_row( row, backfilled=True, state_resets=state_resets -- cgit 1.4.1 From 8d7ad44331d7eff4a140b1e4777532d8a3fb26cb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Apr 2016 10:57:49 +0100 Subject: Report per request metrics for all of the things using request_handler --- synapse/http/server.py | 101 ++++++++++++++++---------- synapse/replication/pusher_resource.py | 3 +- synapse/replication/resource.py | 3 +- synapse/rest/key/v1/server_key_resource.py | 1 - synapse/rest/key/v2/remote_key_resource.py | 4 +- synapse/rest/media/v1/download_resource.py | 3 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/rest/media/v1/thumbnail_resource.py | 3 +- synapse/rest/media/v1/upload_resource.py | 3 +- 9 files changed, 76 insertions(+), 47 deletions(-) (limited to 'synapse/replication/resource.py') diff --git a/synapse/http/server.py b/synapse/http/server.py index b82196fd5e..d4d639f617 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -74,7 +74,12 @@ response_db_txn_duration = metrics.register_distribution( _next_request_id = 0 -def request_handler(request_handler): +def request_handler(report_metrics=True): + """Decorator for ``wrap_request_handler``""" + return lambda request_handler: wrap_request_handler(request_handler, report_metrics) + + +def wrap_request_handler(request_handler, report_metrics): """Wraps a method that acts as a request handler with the necessary logging and exception handling. @@ -96,6 +101,10 @@ def request_handler(request_handler): global _next_request_id request_id = "%s-%s" % (request.method, _next_request_id) _next_request_id += 1 + if report_metrics: + request_metrics = RequestMetrics() + request_metrics.start(self.clock) + with LoggingContext(request_id) as request_context: request_context.request = request_id with request.processing(): @@ -133,6 +142,13 @@ def request_handler(request_handler): }, send_cors=True ) + finally: + try: + request_metrics.stop( + self.clock, request, self.__class__.__name__ + ) + except: + pass return wrapped_request_handler @@ -197,19 +213,19 @@ class JsonResource(HttpServer, resource.Resource): self._async_render(request) return server.NOT_DONE_YET - @request_handler + @request_handler(report_metrics=False) @defer.inlineCallbacks def _async_render(self, request): """ This gets called from render() every time someone sends us a request. This checks if anyone has registered a callback for that method and path. """ - start = self.clock.time_msec() if request.method == "OPTIONS": self._send_response(request, 200, {}) return - start_context = LoggingContext.current_context() + request_metrics = RequestMetrics() + request_metrics.start(self.clock) # Loop through all the registered callbacks to check if the method # and path regex match @@ -241,40 +257,7 @@ class JsonResource(HttpServer, resource.Resource): self._send_response(request, code, response) try: - context = LoggingContext.current_context() - - tag = "" - if context: - tag = context.tag - - if context != start_context: - logger.warn( - "Context have unexpectedly changed %r, %r", - context, self.start_context - ) - return - - incoming_requests_counter.inc(request.method, servlet_classname, tag) - - response_timer.inc_by( - self.clock.time_msec() - start, request.method, - servlet_classname, tag - ) - - ru_utime, ru_stime = context.get_resource_usage() - - response_ru_utime.inc_by( - ru_utime, request.method, servlet_classname, tag - ) - response_ru_stime.inc_by( - ru_stime, request.method, servlet_classname, tag - ) - response_db_txn_count.inc_by( - context.db_txn_count, request.method, servlet_classname, tag - ) - response_db_txn_duration.inc_by( - context.db_txn_duration, request.method, servlet_classname, tag - ) + request_metrics.stop(self.clock, request, servlet_classname) except: pass @@ -307,6 +290,48 @@ class JsonResource(HttpServer, resource.Resource): ) +class RequestMetrics(object): + def start(self, clock): + self.start = clock.time_msec() + self.start_context = LoggingContext.current_context() + + def stop(self, clock, request, servlet_classname): + context = LoggingContext.current_context() + + tag = "" + if context: + tag = context.tag + + if context != start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + + incoming_requests_counter.inc(request.method, servlet_classname, tag) + + response_timer.inc_by( + self.clock.time_msec() - start, request.method, + servlet_classname, tag + ) + + ru_utime, ru_stime = context.get_resource_usage() + + response_ru_utime.inc_by( + ru_utime, request.method, servlet_classname, tag + ) + response_ru_stime.inc_by( + ru_stime, request.method, servlet_classname, tag + ) + response_db_txn_count.inc_by( + context.db_txn_count, request.method, servlet_classname, tag + ) + response_db_txn_duration.inc_by( + context.db_txn_duration, request.method, servlet_classname, tag + ) + + class RootRedirect(resource.Resource): """Redirects the root '/' path to another path.""" diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py index b87026d79a..9b01ab3c13 100644 --- a/synapse/replication/pusher_resource.py +++ b/synapse/replication/pusher_resource.py @@ -31,12 +31,13 @@ class PusherResource(Resource): self.version_string = hs.version_string self.store = hs.get_datastore() self.notifier = hs.get_notifier() + self.clock = hs.get_clock() def render_POST(self, request): self._async_render_POST(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_POST(self, request): content = parse_json_object_from_request(request) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 149fc4c650..ff78c60f13 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -112,6 +112,7 @@ class ReplicationResource(Resource): self.presence_handler = hs.get_handlers().presence_handler self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier + self.clock = hs.get_clock() self.putChild("remove_pushers", PusherResource(hs)) @@ -139,7 +140,7 @@ class ReplicationResource(Resource): state_token, )) - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): limit = parse_integer(request, "limit", 100) diff --git a/synapse/rest/key/v1/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py index 3db3838b7e..bd4fea5774 100644 --- a/synapse/rest/key/v1/server_key_resource.py +++ b/synapse/rest/key/v1/server_key_resource.py @@ -49,7 +49,6 @@ class LocalKey(Resource): """ def __init__(self, hs): - self.hs = hs self.version_string = hs.version_string self.response_body = encode_canonical_json( self.response_json_object(hs.config) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 9552016fec..7209d5a37d 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -97,7 +97,7 @@ class RemoteKey(Resource): self.async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def async_render_GET(self, request): if len(request.postpath) == 1: @@ -122,7 +122,7 @@ class RemoteKey(Resource): self.async_render_POST(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def async_render_POST(self, request): content = parse_json_object_from_request(request) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 510884262c..9f69620772 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -36,12 +36,13 @@ class DownloadResource(Resource): self.server_name = hs.hostname self.store = hs.get_datastore() self.version_string = hs.version_string + self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): server_name, media_id, name = parse_media_id(request) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 69327ac493..dc1e5fbdb3 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -74,7 +74,7 @@ class PreviewUrlResource(Resource): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 234dd4261c..0b9e1de1a7 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -39,12 +39,13 @@ class ThumbnailResource(Resource): self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.server_name = hs.hostname self.version_string = hs.version_string + self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): server_name, media_id, _ = parse_media_id(request) diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 299e1f6e56..b716d1d892 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -41,6 +41,7 @@ class UploadResource(Resource): self.auth = hs.get_auth() self.max_upload_size = hs.config.max_upload_size self.version_string = hs.version_string + self.clock = hs.get_clock() def render_POST(self, request): self._async_render_POST(request) @@ -50,7 +51,7 @@ class UploadResource(Resource): respond_with_json(request, 200, {}, send_cors=True) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_POST(self, request): requester = yield self.auth.get_user_by_req(request) -- cgit 1.4.1