From e99365f6015af6dc0c2c107f47bda3760ff1153e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 15:22:14 +0100 Subject: Replicate get_invited_rooms_for_user --- synapse/replication/slave/storage/events.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cfc728a038..82f171c257 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -68,6 +68,9 @@ class SlavedEventStore(BaseSlavedStore): _get_current_state_for_key = StateStore.__dict__[ "_get_current_state_for_key" ] + get_invited_rooms_for_user = RoomMemberStore.__dict__[ + "get_invited_rooms_for_user" + ] get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ @@ -82,6 +85,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + _set_before_and_after = DataStore._set_before_and_after _get_events = DataStore._get_events.__func__ @@ -147,11 +151,11 @@ class SlavedEventStore(BaseSlavedStore): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) - self._invalidate_caches_for_event( + self.invalidate_caches_for_event( event, backfilled, reset_state=position in state_resets ) - def _invalidate_caches_for_event(self, event, backfilled, reset_state): + def invalidate_caches_for_event(self, event, backfilled, reset_state): if reset_state: self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() @@ -182,6 +186,7 @@ class SlavedEventStore(BaseSlavedStore): # self._membership_stream_cache.entity_has_changed( # event.state_key, event.internal_metadata.stream_ordering # ) + self.get_invited_rooms_for_user.invalidate((event.state_key,)) if not event.is_state(): return -- cgit 1.5.1 From 5bbd424ee0c983d305014df618fa1917ecd10d91 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 17:11:44 +0100 Subject: Add a slaved receipts store --- synapse/replication/slave/storage/receipts.py | 61 ++++++++++++++++++++++++ tests/replication/slave/storage/_base.py | 4 +- tests/replication/slave/storage/test_events.py | 3 ++ tests/replication/slave/storage/test_receipts.py | 39 +++++++++++++++ 4 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 synapse/replication/slave/storage/receipts.py create mode 100644 tests/replication/slave/storage/test_receipts.py (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py new file mode 100644 index 0000000000..b55d5dfd08 --- /dev/null +++ b/synapse/replication/slave/storage/receipts.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore +from synapse.storage.receipts import ReceiptsStore + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedReceiptsStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedReceiptsStore, self).__init__(db_conn, hs) + + self._receipts_id_gen = SlavedIdTracker( + db_conn, "receipts_linearized", "stream_id" + ) + + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + + get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + + def stream_positions(self): + result = super(SlavedReceiptsStore, self).stream_positions() + result["receipts"] = self._receipts_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("receipts") + if stream: + self._receipts_id_gen.advance(stream["position"]) + for row in stream["rows"]: + room_id, receipt_type, user_id = row[1:4] + self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + + return super(SlavedReceiptsStore, self).process_replication(result) + + def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): + self.get_receipts_for_user.invalidate((user_id, receipt_type)) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 983caafe8a..1f13cd0bc0 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,8 +15,6 @@ from twisted.internet import defer from tests import unittest -from synapse.replication.slave.storage.events import SlavedEventStore - from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver from synapse.replication.resource import ReplicationResource @@ -38,7 +36,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.replication = ReplicationResource(self.hs) self.master_store = self.hs.get_datastore() - self.slaved_store = SlavedEventStore(self.hs.get_db_conn(), self.hs) + self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.event_id = 0 @defer.inlineCallbacks diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index baa4a26eb5..66f166047e 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStoreTestCase from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext +from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser from twisted.internet import defer @@ -43,6 +44,8 @@ def patch__eq__(cls): class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): + STORE_TYPE = SlavedEventStore + def setUp(self): # Patch up the equality operator for events so that we can check # whether lists of events match using assertEquals diff --git a/tests/replication/slave/storage/test_receipts.py b/tests/replication/slave/storage/test_receipts.py new file mode 100644 index 0000000000..6624fe4eea --- /dev/null +++ b/tests/replication/slave/storage/test_receipts.py @@ -0,0 +1,39 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +ROOM_ID = "!room:blue" +EVENT_ID = "$event:blue" + + +class SlavedReceiptTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedReceiptsStore + + @defer.inlineCallbacks + def test_receipt(self): + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], {}) + yield self.master_store.insert_receipt( + ROOM_ID, "m.read", USER_ID, [EVENT_ID], {} + ) + yield self.replicate() + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], { + ROOM_ID: EVENT_ID + }) -- cgit 1.5.1 From c0d8e0eb6375d5b8754db420af43733a642c7f38 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 15:25:47 +0100 Subject: Replicate push actions --- synapse/replication/slave/storage/events.py | 14 +++++++++ tests/replication/slave/storage/test_events.py | 43 ++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 82f171c257..5f37ba6995 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -21,6 +21,7 @@ from synapse.storage import DataStore from synapse.storage.room import RoomStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore +from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.state import StateStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -71,7 +72,16 @@ class SlavedEventStore(BaseSlavedStore): get_invited_rooms_for_user = RoomMemberStore.__dict__[ "get_invited_rooms_for_user" ] + get_unread_event_push_actions_by_room_for_user = ( + EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] + ) + get_unread_push_actions_for_user_in_range = ( + DataStore.get_unread_push_actions_for_user_in_range.__func__ + ) + get_push_action_users_in_range = ( + DataStore.get_push_action_users_in_range.__func__ + ) get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ @@ -167,6 +177,10 @@ class SlavedEventStore(BaseSlavedStore): self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (event.room_id,) + ) + if not backfilled: self._events_stream_cache.entity_has_changed( event.room_id, event.internal_metadata.stream_ordering diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 41a626cf70..17587fda00 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -266,6 +266,47 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): event.internal_metadata.stream_ordering )]) + @defer.inlineCallbacks + def test_push_actions_for_user(self): + yield self.persist(type="m.room.create", creator=USER_ID) + yield self.persist(type="m.room.join", key=USER_ID, membership="join") + yield self.persist( + type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join" + ) + event1 = yield self.persist( + type="m.room.message", msgtype="m.text", body="hello" + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 0} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, ["notify"])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 1} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, [ + "notify", {"set_tweak": "highlight", "value": True} + ])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 1, "notify_count": 2} + ) + event_id = 0 @defer.inlineCallbacks @@ -273,6 +314,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, internal={}, state=None, reset_state=False, backfill=False, depth=None, prev_events=[], auth_events=[], prev_state=[], redacts=None, + push_actions=[], **content ): """ @@ -305,6 +347,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.event_id += 1 context = EventContext(current_state=state) + context.push_actions = push_actions ordering = None if backfill: -- cgit 1.5.1 From d4823efad9d1d881c12c779e9e94691d90d8902b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 16:18:00 +0100 Subject: Replicate the pushers --- synapse/replication/slave/storage/pushers.py | 52 ++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 synapse/replication/slave/storage/pushers.py (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py new file mode 100644 index 0000000000..8faddb2595 --- /dev/null +++ b/synapse/replication/slave/storage/pushers.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore + + +class SlavedPusherStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedPusherStore, self).__init__(db_conn, hs) + self._pushers_id_gen = SlavedIdTracker( + db_conn, "pushers", "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) + + get_all_pushers = DataStore.get_all_pushers.__func__ + get_pushers_by = DataStore.get_pushers_by.__func__ + get_pushers_by_app_id_and_pushkey = ( + DataStore.get_pushers_by_app_id_and_pushkey.__func__ + ) + _decode_pushers_rows = DataStore._decode_pushers_rows.__func__ + + def stream_positions(self): + result = super(SlavedPusherStore, self).stream_positions() + result["pushers"] = self._pushers_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("pushers") + if stream: + self._pushers_id_gen.advance(stream["position"]) + + stream = result.get("deleted_pushers") + if stream: + self._pushers_id_gen.advance(stream["position"]) + + return super(SlavedPusherStore, self).process_replication(result) -- cgit 1.5.1 From cfe1ff4bdb9296ff2a7dc167dabf4397f81634f7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 16:33:05 +0100 Subject: Add a replication endpoint for deleting pushers --- synapse/replication/pusher_resource.py | 53 ++++++++++++++++++++++++++++++++++ synapse/replication/resource.py | 7 +++-- 2 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 synapse/replication/pusher_resource.py (limited to 'synapse/replication') diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py new file mode 100644 index 0000000000..b87026d79a --- /dev/null +++ b/synapse/replication/pusher_resource.py @@ -0,0 +1,53 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PusherResource(Resource): + """ + HTTP endpoint for deleting rejected pushers + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + for remove in content["remove"]: + yield self.store.delete_pusher_by_app_id_pushkey_user_id( + remove["app_id"], + remove["push_key"], + remove["user_id"], + ) + + self.notifier.on_new_replication_data() + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a543af68f8..e5c9a53929 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -15,6 +15,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request +from synapse.replication.pusher_resource import PusherResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -102,8 +103,6 @@ class ReplicationResource(Resource): long-polling this replication API for new data on those streams. """ - isLeaf = True - def __init__(self, hs): Resource.__init__(self) # Resource is old-style, so no super() @@ -114,6 +113,8 @@ class ReplicationResource(Resource): self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier + self.putChild("remove_pushers", PusherResource(hs)) + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -343,7 +344,7 @@ class ReplicationResource(Resource): "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" )) - writer.write_header_and_rows("deleted", deleted, ( + writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" )) -- cgit 1.5.1 From 8a656664544fbc23db618aa855cc61ac54d9afeb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 27 Apr 2016 15:38:43 +0100 Subject: Fix backfill replication to advance the stream correctly --- synapse/replication/resource.py | 2 +- synapse/replication/slave/storage/events.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index e5c9a53929..149fc4c650 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -382,7 +382,7 @@ class _Writer(object): position = rows[-1][0] self.streams[name] = { - "position": str(position), + "position": position if type(position) is int else str(position), "field_names": fields, "rows": rows, } diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5f37ba6995..86f00b6ff5 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -118,7 +118,7 @@ class SlavedEventStore(BaseSlavedStore): def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() - result["backfill"] = self._backfill_id_gen.get_current_token() + result["backfill"] = -self._backfill_id_gen.get_current_token() return result def process_replication(self, result): @@ -136,7 +136,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("backfill") if stream: - self._backfill_id_gen.advance(stream["position"]) + self._backfill_id_gen.advance(-stream["position"]) for row in stream["rows"]: self._process_replication_row( row, backfilled=True, state_resets=state_resets -- cgit 1.5.1 From 8d7ad44331d7eff4a140b1e4777532d8a3fb26cb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Apr 2016 10:57:49 +0100 Subject: Report per request metrics for all of the things using request_handler --- synapse/http/server.py | 101 ++++++++++++++++---------- synapse/replication/pusher_resource.py | 3 +- synapse/replication/resource.py | 3 +- synapse/rest/key/v1/server_key_resource.py | 1 - synapse/rest/key/v2/remote_key_resource.py | 4 +- synapse/rest/media/v1/download_resource.py | 3 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/rest/media/v1/thumbnail_resource.py | 3 +- synapse/rest/media/v1/upload_resource.py | 3 +- 9 files changed, 76 insertions(+), 47 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/http/server.py b/synapse/http/server.py index b82196fd5e..d4d639f617 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -74,7 +74,12 @@ response_db_txn_duration = metrics.register_distribution( _next_request_id = 0 -def request_handler(request_handler): +def request_handler(report_metrics=True): + """Decorator for ``wrap_request_handler``""" + return lambda request_handler: wrap_request_handler(request_handler, report_metrics) + + +def wrap_request_handler(request_handler, report_metrics): """Wraps a method that acts as a request handler with the necessary logging and exception handling. @@ -96,6 +101,10 @@ def request_handler(request_handler): global _next_request_id request_id = "%s-%s" % (request.method, _next_request_id) _next_request_id += 1 + if report_metrics: + request_metrics = RequestMetrics() + request_metrics.start(self.clock) + with LoggingContext(request_id) as request_context: request_context.request = request_id with request.processing(): @@ -133,6 +142,13 @@ def request_handler(request_handler): }, send_cors=True ) + finally: + try: + request_metrics.stop( + self.clock, request, self.__class__.__name__ + ) + except: + pass return wrapped_request_handler @@ -197,19 +213,19 @@ class JsonResource(HttpServer, resource.Resource): self._async_render(request) return server.NOT_DONE_YET - @request_handler + @request_handler(report_metrics=False) @defer.inlineCallbacks def _async_render(self, request): """ This gets called from render() every time someone sends us a request. This checks if anyone has registered a callback for that method and path. """ - start = self.clock.time_msec() if request.method == "OPTIONS": self._send_response(request, 200, {}) return - start_context = LoggingContext.current_context() + request_metrics = RequestMetrics() + request_metrics.start(self.clock) # Loop through all the registered callbacks to check if the method # and path regex match @@ -241,40 +257,7 @@ class JsonResource(HttpServer, resource.Resource): self._send_response(request, code, response) try: - context = LoggingContext.current_context() - - tag = "" - if context: - tag = context.tag - - if context != start_context: - logger.warn( - "Context have unexpectedly changed %r, %r", - context, self.start_context - ) - return - - incoming_requests_counter.inc(request.method, servlet_classname, tag) - - response_timer.inc_by( - self.clock.time_msec() - start, request.method, - servlet_classname, tag - ) - - ru_utime, ru_stime = context.get_resource_usage() - - response_ru_utime.inc_by( - ru_utime, request.method, servlet_classname, tag - ) - response_ru_stime.inc_by( - ru_stime, request.method, servlet_classname, tag - ) - response_db_txn_count.inc_by( - context.db_txn_count, request.method, servlet_classname, tag - ) - response_db_txn_duration.inc_by( - context.db_txn_duration, request.method, servlet_classname, tag - ) + request_metrics.stop(self.clock, request, servlet_classname) except: pass @@ -307,6 +290,48 @@ class JsonResource(HttpServer, resource.Resource): ) +class RequestMetrics(object): + def start(self, clock): + self.start = clock.time_msec() + self.start_context = LoggingContext.current_context() + + def stop(self, clock, request, servlet_classname): + context = LoggingContext.current_context() + + tag = "" + if context: + tag = context.tag + + if context != start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + + incoming_requests_counter.inc(request.method, servlet_classname, tag) + + response_timer.inc_by( + self.clock.time_msec() - start, request.method, + servlet_classname, tag + ) + + ru_utime, ru_stime = context.get_resource_usage() + + response_ru_utime.inc_by( + ru_utime, request.method, servlet_classname, tag + ) + response_ru_stime.inc_by( + ru_stime, request.method, servlet_classname, tag + ) + response_db_txn_count.inc_by( + context.db_txn_count, request.method, servlet_classname, tag + ) + response_db_txn_duration.inc_by( + context.db_txn_duration, request.method, servlet_classname, tag + ) + + class RootRedirect(resource.Resource): """Redirects the root '/' path to another path.""" diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py index b87026d79a..9b01ab3c13 100644 --- a/synapse/replication/pusher_resource.py +++ b/synapse/replication/pusher_resource.py @@ -31,12 +31,13 @@ class PusherResource(Resource): self.version_string = hs.version_string self.store = hs.get_datastore() self.notifier = hs.get_notifier() + self.clock = hs.get_clock() def render_POST(self, request): self._async_render_POST(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_POST(self, request): content = parse_json_object_from_request(request) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 149fc4c650..ff78c60f13 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -112,6 +112,7 @@ class ReplicationResource(Resource): self.presence_handler = hs.get_handlers().presence_handler self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier + self.clock = hs.get_clock() self.putChild("remove_pushers", PusherResource(hs)) @@ -139,7 +140,7 @@ class ReplicationResource(Resource): state_token, )) - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): limit = parse_integer(request, "limit", 100) diff --git a/synapse/rest/key/v1/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py index 3db3838b7e..bd4fea5774 100644 --- a/synapse/rest/key/v1/server_key_resource.py +++ b/synapse/rest/key/v1/server_key_resource.py @@ -49,7 +49,6 @@ class LocalKey(Resource): """ def __init__(self, hs): - self.hs = hs self.version_string = hs.version_string self.response_body = encode_canonical_json( self.response_json_object(hs.config) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 9552016fec..7209d5a37d 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -97,7 +97,7 @@ class RemoteKey(Resource): self.async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def async_render_GET(self, request): if len(request.postpath) == 1: @@ -122,7 +122,7 @@ class RemoteKey(Resource): self.async_render_POST(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def async_render_POST(self, request): content = parse_json_object_from_request(request) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 510884262c..9f69620772 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -36,12 +36,13 @@ class DownloadResource(Resource): self.server_name = hs.hostname self.store = hs.get_datastore() self.version_string = hs.version_string + self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): server_name, media_id, name = parse_media_id(request) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 69327ac493..dc1e5fbdb3 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -74,7 +74,7 @@ class PreviewUrlResource(Resource): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 234dd4261c..0b9e1de1a7 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -39,12 +39,13 @@ class ThumbnailResource(Resource): self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.server_name = hs.hostname self.version_string = hs.version_string + self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): server_name, media_id, _ = parse_media_id(request) diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 299e1f6e56..b716d1d892 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -41,6 +41,7 @@ class UploadResource(Resource): self.auth = hs.get_auth() self.max_upload_size = hs.config.max_upload_size self.version_string = hs.version_string + self.clock = hs.get_clock() def render_POST(self, request): self._async_render_POST(request) @@ -50,7 +51,7 @@ class UploadResource(Resource): respond_with_json(request, 200, {}, send_cors=True) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_POST(self, request): requester = yield self.auth.get_user_by_req(request) -- cgit 1.5.1