From fb76a81ff7615da46c043a0ee1e8b980756efe00 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:45:05 +0100 Subject: Reorder imports --- synapse/rest/media/v1/preview_url_resource.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index fecdf8ed86..122b34faea 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -45,7 +45,15 @@ class PreviewUrlResource(Resource): def __init__(self, hs, media_repo): Resource.__init__(self) + + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.version_string = hs.version_string + self.filepaths = media_repo.filepaths + self.max_spider_size = hs.config.max_spider_size + self.server_name = hs.hostname self.client = SpiderHttpClient(hs) + if hasattr(hs.config, "url_preview_url_blacklist"): self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist @@ -60,13 +68,6 @@ class PreviewUrlResource(Resource): self.downloads = {} - self.auth = hs.get_auth() - self.clock = hs.get_clock() - self.version_string = hs.version_string - self.filepaths = media_repo.filepaths - self.max_spider_size = hs.config.max_spider_size - self.server_name = hs.hostname - def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET -- cgit 1.4.1 From 9181e2f4c78acba89644ac21eed5ce7c9fc872c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:48:24 +0100 Subject: Add store to PreviewUrlResource --- synapse/rest/media/v1/preview_url_resource.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 122b34faea..70087e959a 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -52,6 +52,7 @@ class PreviewUrlResource(Resource): self.filepaths = media_repo.filepaths self.max_spider_size = hs.config.max_spider_size self.server_name = hs.hostname + self.store = hs.get_datastore() self.client = SpiderHttpClient(hs) if hasattr(hs.config, "url_preview_url_blacklist"): -- cgit 1.4.1 From a7001c311b76fbdcefc00b753fa50b1bdd3dc4cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:49:31 +0100 Subject: _make_dirs was moved to MediaRepository --- synapse/rest/media/v1/preview_url_resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 70087e959a..3d93d928e4 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -370,7 +370,7 @@ class PreviewUrlResource(Resource): file_id = random_string(24) fname = self.filepaths.local_media_filepath(file_id) - self._makedirs(fname) + self.media_repo._makedirs(fname) try: with open(fname, "wb") as f: -- cgit 1.4.1 From e8884e5e9cad42445e14b9f119e2a4f69334f726 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:51:34 +0100 Subject: Add self.media_repo to PreviewUrlResource --- synapse/rest/media/v1/preview_url_resource.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 3d93d928e4..69327ac493 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -54,6 +54,7 @@ class PreviewUrlResource(Resource): self.server_name = hs.hostname self.store = hs.get_datastore() self.client = SpiderHttpClient(hs) + self.media_repo = media_repo if hasattr(hs.config, "url_preview_url_blacklist"): self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist -- cgit 1.4.1 From e99365f6015af6dc0c2c107f47bda3760ff1153e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 15:22:14 +0100 Subject: Replicate get_invited_rooms_for_user --- synapse/replication/slave/storage/events.py | 9 +++++++-- tests/replication/slave/storage/test_events.py | 12 ++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cfc728a038..82f171c257 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -68,6 +68,9 @@ class SlavedEventStore(BaseSlavedStore): _get_current_state_for_key = StateStore.__dict__[ "_get_current_state_for_key" ] + get_invited_rooms_for_user = RoomMemberStore.__dict__[ + "get_invited_rooms_for_user" + ] get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ @@ -82,6 +85,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + _set_before_and_after = DataStore._set_before_and_after _get_events = DataStore._get_events.__func__ @@ -147,11 +151,11 @@ class SlavedEventStore(BaseSlavedStore): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) - self._invalidate_caches_for_event( + self.invalidate_caches_for_event( event, backfilled, reset_state=position in state_resets ) - def _invalidate_caches_for_event(self, event, backfilled, reset_state): + def invalidate_caches_for_event(self, event, backfilled, reset_state): if reset_state: self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() @@ -182,6 +186,7 @@ class SlavedEventStore(BaseSlavedStore): # self._membership_stream_cache.entity_has_changed( # event.state_key, event.internal_metadata.stream_ordering # ) + self.get_invited_rooms_for_user.invalidate((event.state_key,)) if not event.is_state(): return diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index baa4a26eb5..88b8d08110 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -251,6 +251,18 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) yield self.check("get_event", [msg.event_id], redacted) + @defer.inlineCallbacks + def test_invites(self): + yield self.check("get_invited_rooms_for_user", [USER_ID_2], []) + event = yield self.persist( + type="m.room.member", key=USER_ID_2, membership="invite" + ) + yield self.replicate() + yield self.check("get_invited_rooms_for_user", [USER_ID_2], [RoomsForUser( + ROOM_ID, USER_ID, "invite", event.event_id, + event.internal_metadata.stream_ordering + )]) + event_id = 0 @defer.inlineCallbacks -- cgit 1.4.1 From f505575f69f90eb027acb093819c083ed49a8008 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 15:39:08 +0100 Subject: Make InsecureInterceptableContextFactory work with SpiderEndpoint --- synapse/http/client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/http/client.py b/synapse/http/client.py index 6c89b20984..902ae7a203 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -462,5 +462,8 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory): self._context = SSL.Context(SSL.SSLv23_METHOD) self._context.set_verify(VERIFY_NONE, lambda *_: None) - def getContext(self, hostname, port): + def getContext(self, hostname=None, port=None): return self._context + + def creatorForNetloc(self, hostname, port): + return self -- cgit 1.4.1 From 5bbd424ee0c983d305014df618fa1917ecd10d91 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 17:11:44 +0100 Subject: Add a slaved receipts store --- synapse/replication/slave/storage/receipts.py | 61 ++++++++++++++++++++++++ tests/replication/slave/storage/_base.py | 4 +- tests/replication/slave/storage/test_events.py | 3 ++ tests/replication/slave/storage/test_receipts.py | 39 +++++++++++++++ 4 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 synapse/replication/slave/storage/receipts.py create mode 100644 tests/replication/slave/storage/test_receipts.py (limited to 'synapse') diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py new file mode 100644 index 0000000000..b55d5dfd08 --- /dev/null +++ b/synapse/replication/slave/storage/receipts.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore +from synapse.storage.receipts import ReceiptsStore + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedReceiptsStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedReceiptsStore, self).__init__(db_conn, hs) + + self._receipts_id_gen = SlavedIdTracker( + db_conn, "receipts_linearized", "stream_id" + ) + + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + + get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + + def stream_positions(self): + result = super(SlavedReceiptsStore, self).stream_positions() + result["receipts"] = self._receipts_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("receipts") + if stream: + self._receipts_id_gen.advance(stream["position"]) + for row in stream["rows"]: + room_id, receipt_type, user_id = row[1:4] + self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + + return super(SlavedReceiptsStore, self).process_replication(result) + + def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): + self.get_receipts_for_user.invalidate((user_id, receipt_type)) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 983caafe8a..1f13cd0bc0 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,8 +15,6 @@ from twisted.internet import defer from tests import unittest -from synapse.replication.slave.storage.events import SlavedEventStore - from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver from synapse.replication.resource import ReplicationResource @@ -38,7 +36,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.replication = ReplicationResource(self.hs) self.master_store = self.hs.get_datastore() - self.slaved_store = SlavedEventStore(self.hs.get_db_conn(), self.hs) + self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.event_id = 0 @defer.inlineCallbacks diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index baa4a26eb5..66f166047e 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStoreTestCase from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext +from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser from twisted.internet import defer @@ -43,6 +44,8 @@ def patch__eq__(cls): class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): + STORE_TYPE = SlavedEventStore + def setUp(self): # Patch up the equality operator for events so that we can check # whether lists of events match using assertEquals diff --git a/tests/replication/slave/storage/test_receipts.py b/tests/replication/slave/storage/test_receipts.py new file mode 100644 index 0000000000..6624fe4eea --- /dev/null +++ b/tests/replication/slave/storage/test_receipts.py @@ -0,0 +1,39 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +ROOM_ID = "!room:blue" +EVENT_ID = "$event:blue" + + +class SlavedReceiptTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedReceiptsStore + + @defer.inlineCallbacks + def test_receipt(self): + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], {}) + yield self.master_store.insert_receipt( + ROOM_ID, "m.read", USER_ID, [EVENT_ID], {} + ) + yield self.replicate() + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], { + ROOM_ID: EVENT_ID + }) -- cgit 1.4.1 From 61c7edfd34abdb9eaa7c8d3dd3dbef95b60de5de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 17:22:03 +0100 Subject: Add cache to _get_state_groups_from_groups --- synapse/storage/state.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c5d2a3a6df..5b743db67a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -174,6 +174,12 @@ class StateStore(SQLBaseStore): return [r[0] for r in results] return self.runInteraction("get_current_state_for_key", f) + @cached(num_args=2, lru=True, max_entries=1000) + def _get_state_group_from_group(self, group, types): + raise NotImplementedError() + + @cachedList(cached_method_name="_get_state_group_from_group", + list_name="groups", num_args=2, inlineCallbacks=True) def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ @@ -201,18 +207,23 @@ class StateStore(SQLBaseStore): txn.execute(sql, args) rows = self.cursor_to_dict(txn) - results = {} + results = {group: {} for group in groups} for row in rows: key = (row["type"], row["state_key"]) - results.setdefault(row["state_group"], {})[key] = row["event_id"] + results[row["state_group"]][key] = row["event_id"] return results + results = {} + chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] for chunk in chunks: - return self.runInteraction( + res = yield self.runInteraction( "_get_state_groups_from_groups", f, chunk ) + results.update(res) + + defer.returnValue(results) @defer.inlineCallbacks def get_state_for_events(self, event_ids, types): @@ -359,6 +370,8 @@ class StateStore(SQLBaseStore): a `state_key` of None matches all state_keys. If `types` is None then all events are returned. """ + if types: + types = frozenset(types) results = {} missing_groups = [] if types is not None: -- cgit 1.4.1 From 4cf4320593f9d3448e28819e094b42eadab6967d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Apr 2016 11:06:02 +0100 Subject: Add some logging to state resolve_events --- synapse/state.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse') diff --git a/synapse/state.py b/synapse/state.py index 58211f5feb..cca9167e5b 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -230,6 +230,8 @@ class StateHandler(object): (cache.state_group, state, prev_states) ) + logger.info("Resolving state for %s with %d groups", room_id, len(state_groups)) + new_state, prev_states = self._resolve_events( state_groups.values(), event_type, state_key ) @@ -246,6 +248,9 @@ class StateHandler(object): defer.returnValue((None, new_state, prev_states)) def resolve_events(self, state_sets, event): + logger.info( + "Resolving state for %s with %d groups", event.room_id, len(state_sets) + ) if event.is_state(): return self._resolve_events( state_sets, event.type, event.state_key -- cgit 1.4.1 From 5bbc3215887a653796a09178fbb69c38e241259d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Apr 2016 11:39:54 +0100 Subject: Always use state cache entry if it exists Also check if the resolved state matches an existing state group. --- synapse/state.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/state.py b/synapse/state.py index cca9167e5b..d0f76dc4f5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -214,7 +214,7 @@ class StateHandler(object): if self._state_cache is not None: cache = self._state_cache.get(group_names, None) - if cache and cache.state_group: + if cache: cache.ts = self.clock.time_msec() event_dict = yield self.store.get_events(cache.state.values()) @@ -236,16 +236,23 @@ class StateHandler(object): state_groups.values(), event_type, state_key ) + state_group = None + new_state_event_ids = frozenset(e.event_id for e in new_state.values()) + for sg, events in state_groups.items(): + if new_state_event_ids == frozenset(e.event_id for e in events): + state_group = sg + break + if self._state_cache is not None: cache = _StateCacheEntry( state={key: event.event_id for key, event in new_state.items()}, - state_group=None, + state_group=state_group, ts=self.clock.time_msec() ) self._state_cache[group_names] = cache - defer.returnValue((None, new_state, prev_states)) + defer.returnValue((state_group, new_state, prev_states)) def resolve_events(self, state_sets, event): logger.info( -- cgit 1.4.1 From c0d8e0eb6375d5b8754db420af43733a642c7f38 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 15:25:47 +0100 Subject: Replicate push actions --- synapse/replication/slave/storage/events.py | 14 +++++++++ tests/replication/slave/storage/test_events.py | 43 ++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) (limited to 'synapse') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 82f171c257..5f37ba6995 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -21,6 +21,7 @@ from synapse.storage import DataStore from synapse.storage.room import RoomStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore +from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.state import StateStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -71,7 +72,16 @@ class SlavedEventStore(BaseSlavedStore): get_invited_rooms_for_user = RoomMemberStore.__dict__[ "get_invited_rooms_for_user" ] + get_unread_event_push_actions_by_room_for_user = ( + EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] + ) + get_unread_push_actions_for_user_in_range = ( + DataStore.get_unread_push_actions_for_user_in_range.__func__ + ) + get_push_action_users_in_range = ( + DataStore.get_push_action_users_in_range.__func__ + ) get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ @@ -167,6 +177,10 @@ class SlavedEventStore(BaseSlavedStore): self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (event.room_id,) + ) + if not backfilled: self._events_stream_cache.entity_has_changed( event.room_id, event.internal_metadata.stream_ordering diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 41a626cf70..17587fda00 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -266,6 +266,47 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): event.internal_metadata.stream_ordering )]) + @defer.inlineCallbacks + def test_push_actions_for_user(self): + yield self.persist(type="m.room.create", creator=USER_ID) + yield self.persist(type="m.room.join", key=USER_ID, membership="join") + yield self.persist( + type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join" + ) + event1 = yield self.persist( + type="m.room.message", msgtype="m.text", body="hello" + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 0} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, ["notify"])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 1} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, [ + "notify", {"set_tweak": "highlight", "value": True} + ])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 1, "notify_count": 2} + ) + event_id = 0 @defer.inlineCallbacks @@ -273,6 +314,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, internal={}, state=None, reset_state=False, backfill=False, depth=None, prev_events=[], auth_events=[], prev_state=[], redacts=None, + push_actions=[], **content ): """ @@ -305,6 +347,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.event_id += 1 context = EventContext(current_state=state) + context.push_actions = push_actions ordering = None if backfill: -- cgit 1.4.1 From d4823efad9d1d881c12c779e9e94691d90d8902b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 16:18:00 +0100 Subject: Replicate the pushers --- synapse/replication/slave/storage/pushers.py | 52 ++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 synapse/replication/slave/storage/pushers.py (limited to 'synapse') diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py new file mode 100644 index 0000000000..8faddb2595 --- /dev/null +++ b/synapse/replication/slave/storage/pushers.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore + + +class SlavedPusherStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedPusherStore, self).__init__(db_conn, hs) + self._pushers_id_gen = SlavedIdTracker( + db_conn, "pushers", "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) + + get_all_pushers = DataStore.get_all_pushers.__func__ + get_pushers_by = DataStore.get_pushers_by.__func__ + get_pushers_by_app_id_and_pushkey = ( + DataStore.get_pushers_by_app_id_and_pushkey.__func__ + ) + _decode_pushers_rows = DataStore._decode_pushers_rows.__func__ + + def stream_positions(self): + result = super(SlavedPusherStore, self).stream_positions() + result["pushers"] = self._pushers_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("pushers") + if stream: + self._pushers_id_gen.advance(stream["position"]) + + stream = result.get("deleted_pushers") + if stream: + self._pushers_id_gen.advance(stream["position"]) + + return super(SlavedPusherStore, self).process_replication(result) -- cgit 1.4.1 From cfe1ff4bdb9296ff2a7dc167dabf4397f81634f7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 16:33:05 +0100 Subject: Add a replication endpoint for deleting pushers --- synapse/replication/pusher_resource.py | 53 ++++++++++++++++++++++++++++++++++ synapse/replication/resource.py | 7 +++-- 2 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 synapse/replication/pusher_resource.py (limited to 'synapse') diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py new file mode 100644 index 0000000000..b87026d79a --- /dev/null +++ b/synapse/replication/pusher_resource.py @@ -0,0 +1,53 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PusherResource(Resource): + """ + HTTP endpoint for deleting rejected pushers + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + for remove in content["remove"]: + yield self.store.delete_pusher_by_app_id_pushkey_user_id( + remove["app_id"], + remove["push_key"], + remove["user_id"], + ) + + self.notifier.on_new_replication_data() + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a543af68f8..e5c9a53929 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -15,6 +15,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request +from synapse.replication.pusher_resource import PusherResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -102,8 +103,6 @@ class ReplicationResource(Resource): long-polling this replication API for new data on those streams. """ - isLeaf = True - def __init__(self, hs): Resource.__init__(self) # Resource is old-style, so no super() @@ -114,6 +113,8 @@ class ReplicationResource(Resource): self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier + self.putChild("remove_pushers", PusherResource(hs)) + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -343,7 +344,7 @@ class ReplicationResource(Resource): "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" )) - writer.write_header_and_rows("deleted", deleted, ( + writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" )) -- cgit 1.4.1 From c877f0f0345f1ff6d329af2920d7d1a6b5659a86 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 16:41:39 +0100 Subject: Optimise event_search in postgres --- synapse/storage/room.py | 16 ++++-- synapse/storage/schema/delta/31/search_update.py | 65 ++++++++++++++++++++++++ synapse/storage/search.py | 61 +++++++++++++++++++++- 3 files changed, 137 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/31/search_update.py (limited to 'synapse') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 9be977f387..70aa64fb31 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -169,20 +169,28 @@ class RoomStore(SQLBaseStore): def _store_event_search_txn(self, txn, event, key, value): if isinstance(self.database_engine, PostgresEngine): sql = ( - "INSERT INTO event_search (event_id, room_id, key, vector)" - " VALUES (?,?,?,to_tsvector('english', ?))" + "INSERT INTO event_search" + " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" + " VALUES (?,?,?,to_tsvector('english', ?),?,?)" + ) + txn.execute( + sql, + ( + event.event_id, event.room_id, key, value, + event.internal_metadata.stream_ordering, + event.origin_server_ts, + ) ) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( "INSERT INTO event_search (event_id, room_id, key, value)" " VALUES (?,?,?,?)" ) + txn.execute(sql, (event.event_id, event.room_id, key, value,)) else: # This should be unreachable. raise Exception("Unrecognized database engine") - txn.execute(sql, (event.event_id, event.room_id, key, value,)) - @cachedInlineCallbacks() def get_room_name_and_aliases(self, room_id): def f(txn): diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py new file mode 100644 index 0000000000..46a3795d12 --- /dev/null +++ b/synapse/storage/schema/delta/31/search_update.py @@ -0,0 +1,65 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine +from synapse.storage.prepare_database import get_statements + +import logging +import ujson + +logger = logging.getLogger(__name__) + + +ALTER_TABLE = """ +ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT; +ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT; + +CREATE INDEX event_search_room_order ON event_search( + room_id, origin_server_ts, stream_ordering +); +CREATE INDEX event_search_order ON event_search(origin_server_ts, stream_ordering); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + return + + for statement in get_statements(ALTER_TABLE.splitlines()): + cur.execute(statement) + + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + + sql = ( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)" + ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_search_order", progress_json)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 59ac7f424c..375057fa3e 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -29,12 +29,17 @@ logger = logging.getLogger(__name__) class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" + EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" def __init__(self, hs): super(SearchStore, self).__init__(hs) self.register_background_update_handler( self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search ) + self.register_background_update_handler( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, + self._background_reindex_search_order + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -131,6 +136,61 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(result) + @defer.inlineCallbacks + def _background_reindex_search_order(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_ordering, origin_server_ts, event_id FROM events" + " INNER JOIN event_search USING (room_id, event_id)" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + min_stream_id = rows[-1][0] + + sql = ( + "UPDATE event_search SET stream_ordering = ?, origin_server_ts = ?" + " WHERE event_id = ?" + ) + + for index in range(0, len(rows), INSERT_CLUMP_SIZE): + clump = rows[index:index + INSERT_CLUMP_SIZE] + txn.executemany(sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME) + + defer.returnValue(result) + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. @@ -310,7 +370,6 @@ class SearchStore(BackgroundUpdateStore): "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank," " origin_server_ts, stream_ordering, room_id, event_id" " FROM event_search" - " NATURAL JOIN events" " WHERE vector @@ to_tsquery('english', ?) AND " ) args = [search_query, search_query] + args -- cgit 1.4.1 From 565c2edb0ace48a0e8b1bd62199bf0740554cc63 Mon Sep 17 00:00:00 2001 From: Niklas Riekenbrauck Date: Fri, 1 Apr 2016 19:04:28 +0200 Subject: Fix issues with JWT login --- synapse/config/jwt.py | 2 ++ synapse/rest/client/v1/login.py | 9 ++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/config/jwt.py b/synapse/config/jwt.py index 4cb092bbec..5c8199612b 100644 --- a/synapse/config/jwt.py +++ b/synapse/config/jwt.py @@ -30,6 +30,8 @@ class JWTConfig(Config): def default_config(self, **kwargs): return """\ + # The JWT needs to contain a globally unique "sub" (subject) claim. + # # jwt_config: # enabled: true # secret: "a secret" diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index d14ce3efa2..166a78026a 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -224,16 +224,19 @@ class LoginRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def do_jwt_login(self, login_submission): - token = login_submission['token'] + token = login_submission.get("token", None) if token is None: - raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED) + raise LoginError(401, "Token field for JWT is missing", + errcode=Codes.UNAUTHORIZED) try: payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm]) + except jwt.ExpiredSignatureError: + raise LoginError(401, "JWT expired", errcode=Codes.UNAUTHORIZED) except InvalidTokenError: raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED) - user = payload['user'] + user = payload.get("sub", None) if user is None: raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED) -- cgit 1.4.1 From b743c1237e68e75056b83ea4ab93ba2e1ec44b7e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 17:12:04 +0100 Subject: Add missing run_upgrade --- synapse/storage/schema/delta/31/search_update.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse') diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py index 46a3795d12..989e990dbd 100644 --- a/synapse/storage/schema/delta/31/search_update.py +++ b/synapse/storage/schema/delta/31/search_update.py @@ -63,3 +63,7 @@ def run_create(cur, database_engine, *args, **kwargs): sql = database_engine.convert_param_style(sql) cur.execute(sql, ("event_search_order", progress_json)) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From 51bb339ab2130ab29ee9fcfec48d8e62f46c75f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 17:16:11 +0100 Subject: Create index concurrently --- synapse/storage/schema/delta/31/search_update.py | 6 +----- synapse/storage/search.py | 14 +++++++++++++- 2 files changed, 14 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py index 989e990dbd..470ae0c005 100644 --- a/synapse/storage/schema/delta/31/search_update.py +++ b/synapse/storage/schema/delta/31/search_update.py @@ -24,11 +24,6 @@ logger = logging.getLogger(__name__) ALTER_TABLE = """ ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT; ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT; - -CREATE INDEX event_search_room_order ON event_search( - room_id, origin_server_ts, stream_ordering -); -CREATE INDEX event_search_order ON event_search(origin_server_ts, stream_ordering); """ @@ -52,6 +47,7 @@ def run_create(cur, database_engine, *args, **kwargs): "target_min_stream_id_inclusive": min_stream_id, "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, + "have_added_indexes": False, } progress_json = ujson.dumps(progress) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 375057fa3e..548e9eeaef 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -141,10 +141,21 @@ class SearchStore(BackgroundUpdateStore): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) + have_added_index = progress['have_added_indexes'] INSERT_CLUMP_SIZE = 1000 def reindex_search_txn(txn): + if not have_added_index: + txn.execute( + "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search(" + "room_id, origin_server_ts, stream_ordering)" + ) + txn.execute( + "CREATE INDEX CONCURRENTLY event_search_order ON event_search(" + "origin_server_ts, stream_ordering)" + ) + sql = ( "SELECT stream_ordering, origin_server_ts, event_id FROM events" " INNER JOIN event_search USING (room_id, event_id)" @@ -173,7 +184,8 @@ class SearchStore(BackgroundUpdateStore): progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows) + "rows_inserted": rows_inserted + len(rows), + "have_added_index": True, } self._background_update_progress_txn( -- cgit 1.4.1 From a3ac837599f62b77f458505f841cee6072c1f921 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 17:21:02 +0100 Subject: Optionally split out the pushers into a separate process --- synapse/app/pusher.py | 208 +++++++++++++++++++++++++++++++++++++++++++++ synapse/config/server.py | 1 + synapse/push/httppusher.py | 2 +- synapse/push/pusherpool.py | 4 + synapse/server.py | 3 + 5 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 synapse/app/pusher.py (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py new file mode 100644 index 0000000000..8922573db7 --- /dev/null +++ b/synapse/app/pusher.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.util.versionstring import get_version_string +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.storage.engines import create_engine +from synapse.storage import DataStore +from synapse.util.async import sleep +from synapse.util.logcontext import (LoggingContext, preserve_fn) + +from twisted.internet import reactor, defer + +import sys +import logging + +logger = logging.getLogger("synapse.app.pusher") + + +class SlaveConfig(DatabaseConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.start_pushers = True + + def default_config(self, **kwargs): + return """\ + ## Slave ## + #replication_url: https://localhost:{replication_port}/_synapse/replication + + report_stats: False + """ + + +class PusherSlaveConfig(SlaveConfig, LoggingConfig): + pass + + +class PusherSlaveStore( + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore +): + update_pusher_last_stream_ordering_and_success = ( + DataStore.update_pusher_last_stream_ordering_and_success.__func__ + ) + + +class PusherServer(HomeServer): + + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = PusherSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def remove_pusher(self, app_id, push_key, user_id): + http_client = self.get_simple_http_client() + replication_url = self.config.replication_url + url = replication_url + "/remove_pushers" + return http_client.post_json_get_json(url, { + "remove": [{ + "app_id": app_id, + "push_key": push_key, + "user_id": user_id, + }] + }) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + pusher_pool = self.get_pusherpool() + + def stop_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return pusher_pool._refresh_pusher(app_id, pushkey, user_id) + + @defer.inlineCallbacks + def poke_pushers(results): + pushers_rows = set( + map(tuple, results.get("pushers", {}).get("rows", [])) + ) + deleted_pushers_rows = set( + map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + ) + for row in sorted(pushers_rows | deleted_pushers_rows): + if row in deleted_pushers_rows: + user_id, app_id, pushkey = row[1:4] + stop_pusher(user_id, app_id, pushkey) + elif row in pushers_rows: + user_id = row[1] + app_id = row[5] + pushkey = row[8] + yield start_pusher(user_id, app_id, pushkey) + + stream = results.get("events") + if stream: + min_stream_id = stream["rows"][0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_notifications)( + min_stream_id, max_stream_id + ) + + stream = results.get("receipts") + if stream: + rows = stream["rows"] + affected_room_ids = set(row[1] for row in rows) + min_stream_id = rows[0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_receipts)( + min_stream_id, max_stream_id, affected_room_ids + ) + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + poke_pushers(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(30) + + +def setup(config_options): + try: + config = PusherSlaveConfig.load_config( + "Synapse pusher", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ps = PusherServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ps.setup() + + def start(): + ps.replicate() + ps.get_pusherpool().start() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + return ps + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + reactor.run() diff --git a/synapse/config/server.py b/synapse/config/server.py index df4707e1d1..46c633548a 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -28,6 +28,7 @@ class ServerConfig(Config): self.print_pidfile = config.get("print_pidfile") self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) + self.start_pushers = config.get("start_pushers", True) self.listeners = config.get("listeners", []) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 6950a20632..3992804845 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -230,7 +230,7 @@ class HttpPusher(object): "Pushkey %s was rejected: removing", pk ) - yield self.hs.get_pusherpool().remove_pusher( + yield self.hs.remove_pusher( self.app_id, pk, self.user_id ) defer.returnValue(True) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index aa095f9d9b..6ef48d63f7 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) class PusherPool: def __init__(self, _hs): self.hs = _hs + self.start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() self.pushers = {} @@ -177,6 +178,9 @@ class PusherPool: self._start_pushers([p]) def _start_pushers(self, pushers): + if not self.start_pushers: + logger.info("Not starting pushers because they are disabled in the config") + return logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: try: diff --git a/synapse/server.py b/synapse/server.py index 368d615576..ee138de756 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -193,6 +193,9 @@ class HomeServer(object): **self.db_config.get("args", {}) ) + def remove_pusher(self, app_id, push_key, user_id): + return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def _make_dependency_method(depname): def _get(hs): -- cgit 1.4.1 From 129e4034870956126daf520b41f74a51040ddbf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 17:19:25 +0100 Subject: Create index must be on a conn --- synapse/storage/search.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 548e9eeaef..05641fb579 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -143,19 +143,26 @@ class SearchStore(BackgroundUpdateStore): rows_inserted = progress.get("rows_inserted", 0) have_added_index = progress['have_added_indexes'] - INSERT_CLUMP_SIZE = 1000 - - def reindex_search_txn(txn): - if not have_added_index: - txn.execute( + if not have_added_index: + def create_index(conn): + conn.rollback() + conn.set_session(autocommit=True) + c = conn.cursor() + c.execute( "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search(" "room_id, origin_server_ts, stream_ordering)" ) - txn.execute( + c.execute( "CREATE INDEX CONCURRENTLY event_search_order ON event_search(" "origin_server_ts, stream_ordering)" ) + conn.set_session(autocommit=False) + + yield self.runWithConnection(create_index) + INSERT_CLUMP_SIZE = 1000 + + def reindex_search_txn(txn): sql = ( "SELECT stream_ordering, origin_server_ts, event_id FROM events" " INNER JOIN event_search USING (room_id, event_id)" -- cgit 1.4.1 From 3b0fa77f5050bb54dccc5140a76b171d6603f2e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 17:37:42 +0100 Subject: Fix SQL statement --- synapse/storage/search.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 05641fb579..f7730f5d7c 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -164,10 +164,10 @@ class SearchStore(BackgroundUpdateStore): def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, origin_server_ts, event_id FROM events" + "SELECT e.stream_ordering, e.origin_server_ts, event_id FROM events as e" " INNER JOIN event_search USING (room_id, event_id)" - " WHERE ? <= stream_ordering AND stream_ordering < ?" - " ORDER BY stream_ordering DESC" + " WHERE ? <= e.stream_ordering AND e.stream_ordering < ?" + " ORDER BY e.stream_ordering DESC" " LIMIT ?" ) -- cgit 1.4.1 From e395eb1108abac2ada1f846d08285a84fd5042a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 17:39:24 +0100 Subject: Update progress when creating index --- synapse/storage/search.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index f7730f5d7c..1baed674a8 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -158,6 +158,13 @@ class SearchStore(BackgroundUpdateStore): ) conn.set_session(autocommit=False) + pg = dict(progress) + pg["have_added_indexes"] = True + + self._background_update_progress_txn( + conn.cursor(), self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress + ) + yield self.runWithConnection(create_index) INSERT_CLUMP_SIZE = 1000 -- cgit 1.4.1 From 26db18bc90e745e1eb054e2ffd6969918a6253c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 17:45:56 +0100 Subject: Need to do _background_update_progress_txn in actual transaction --- synapse/storage/search.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 1baed674a8..fd40b44a9a 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -158,14 +158,16 @@ class SearchStore(BackgroundUpdateStore): ) conn.set_session(autocommit=False) - pg = dict(progress) - pg["have_added_indexes"] = True + yield self.runWithConnection(create_index) - self._background_update_progress_txn( - conn.cursor(), self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress - ) + pg = dict(progress) + pg["have_added_indexes"] = True - yield self.runWithConnection(create_index) + yield self.runInteraction( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, + self._background_update_progress_txn, + self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress, + ) INSERT_CLUMP_SIZE = 1000 -- cgit 1.4.1 From b57dcb4b51d31914756412ec00d94646bc3b4c79 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 17:49:00 +0100 Subject: Typo --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index fd40b44a9a..dc47425c23 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -201,7 +201,7 @@ class SearchStore(BackgroundUpdateStore): "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, "rows_inserted": rows_inserted + len(rows), - "have_added_index": True, + "have_added_indexes": True, } self._background_update_progress_txn( -- cgit 1.4.1 From 8fae3d7b1eea87b48db96f1671d850a4a247e926 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 18:01:49 +0100 Subject: Use special UPDATE syntax --- synapse/storage/schema/delta/31/search_update.py | 4 +-- synapse/storage/search.py | 32 +++++++++--------------- 2 files changed, 14 insertions(+), 22 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py index 470ae0c005..2c15edd1a4 100644 --- a/synapse/storage/schema/delta/31/search_update.py +++ b/synapse/storage/schema/delta/31/search_update.py @@ -22,8 +22,8 @@ logger = logging.getLogger(__name__) ALTER_TABLE = """ -ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT; -ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT; +ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT DEFAULT 0; +ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT DEFAULT 0; """ diff --git a/synapse/storage/search.py b/synapse/storage/search.py index dc47425c23..813e1e90ac 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -169,34 +169,26 @@ class SearchStore(BackgroundUpdateStore): self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress, ) - INSERT_CLUMP_SIZE = 1000 - def reindex_search_txn(txn): - sql = ( - "SELECT e.stream_ordering, e.origin_server_ts, event_id FROM events as e" - " INNER JOIN event_search USING (room_id, event_id)" - " WHERE ? <= e.stream_ordering AND e.stream_ordering < ?" - " ORDER BY e.stream_ordering DESC" + events_sql = ( + "SELECT stream_ordering, origin_server_ts, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " ORDER BY stream_ordering DESC" " LIMIT ?" ) - txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + sql = ( + "UPDATE event_search AS es SET es.stream_ordering = e.stream_ordering," + " es.origin_server_ts = e.origin_server_ts" + " FROM (%s) AS e" + " WHERE e.event_id = es.event_id" + " RETURNING es.stream_ordering" + ) % (events_sql,) + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) rows = txn.fetchall() - if not rows: - return 0 - min_stream_id = rows[-1][0] - sql = ( - "UPDATE event_search SET stream_ordering = ?, origin_server_ts = ?" - " WHERE event_id = ?" - ) - - for index in range(0, len(rows), INSERT_CLUMP_SIZE): - clump = rows[index:index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) - progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, -- cgit 1.4.1 From 3ddbb1687ced5eb9b9a87367fcd6b754b8d0c5dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 18:02:36 +0100 Subject: Fix query --- synapse/storage/search.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 813e1e90ac..dd3486783d 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -178,8 +178,8 @@ class SearchStore(BackgroundUpdateStore): ) sql = ( - "UPDATE event_search AS es SET es.stream_ordering = e.stream_ordering," - " es.origin_server_ts = e.origin_server_ts" + "UPDATE event_search AS es SET stream_ordering = e.stream_ordering," + " origin_server_ts = e.origin_server_ts" " FROM (%s) AS e" " WHERE e.event_id = es.event_id" " RETURNING es.stream_ordering" -- cgit 1.4.1 From ae571810f2283c1825da62af0e931a0e40f74168 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 18:09:48 +0100 Subject: Order NULLs first --- synapse/storage/schema/delta/31/search_update.py | 4 ++-- synapse/storage/search.py | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py index 2c15edd1a4..470ae0c005 100644 --- a/synapse/storage/schema/delta/31/search_update.py +++ b/synapse/storage/schema/delta/31/search_update.py @@ -22,8 +22,8 @@ logger = logging.getLogger(__name__) ALTER_TABLE = """ -ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT DEFAULT 0; -ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT DEFAULT 0; +ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT; +ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT; """ diff --git a/synapse/storage/search.py b/synapse/storage/search.py index dd3486783d..2c71db8c96 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -148,13 +148,16 @@ class SearchStore(BackgroundUpdateStore): conn.rollback() conn.set_session(autocommit=True) c = conn.cursor() + + # We create with NULLS FIRST so that when we search *backwards* + # we get the ones with non null origin_server_ts *first* c.execute( "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search(" - "room_id, origin_server_ts, stream_ordering)" + "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" ) c.execute( "CREATE INDEX CONCURRENTLY event_search_order ON event_search(" - "origin_server_ts, stream_ordering)" + "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" ) conn.set_session(autocommit=False) @@ -434,7 +437,15 @@ class SearchStore(BackgroundUpdateStore): # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. - sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" + if isinstance(self.database_engine, PostgresEngine): + sql += ( + " ORDER BY origin_server_ts DESC NULLS LAST," + " stream_ordering DESC NULLS LAST LIMIT ?" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" + else: + raise Exception("Unrecognized database engine") args.append(limit) -- cgit 1.4.1 From 183cacac90ca237b448da244270d55920470389b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Apr 2016 09:37:16 +0100 Subject: Simplify query and handle finishing correctly --- synapse/storage/background_updates.py | 3 ++- synapse/storage/search.py | 30 ++++++++++++++---------------- 2 files changed, 16 insertions(+), 17 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 49904046cf..66a995157d 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -173,11 +173,12 @@ class BackgroundUpdateStore(SQLBaseStore): logger.info( "Updating %r. Updated %r items in %rms." - " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)", + " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)", update_name, items_updated, duration_ms, performance.total_items_per_ms(), performance.average_items_per_ms(), performance.total_item_count, + batch_size, ) performance.update(items_updated, duration_ms) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 2c71db8c96..0224299625 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -169,28 +169,26 @@ class SearchStore(BackgroundUpdateStore): yield self.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_update_progress_txn, - self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress, + self.EVENT_SEARCH_ORDER_UPDATE_NAME, pg, ) def reindex_search_txn(txn): - events_sql = ( - "SELECT stream_ordering, origin_server_ts, event_id FROM events" - " WHERE ? <= stream_ordering AND stream_ordering < ?" - " ORDER BY stream_ordering DESC" - " LIMIT ?" - ) - sql = ( "UPDATE event_search AS es SET stream_ordering = e.stream_ordering," " origin_server_ts = e.origin_server_ts" - " FROM (%s) AS e" + " FROM events AS e" " WHERE e.event_id = es.event_id" + " AND ? <= e.stream_ordering AND e.stream_ordering < ?" " RETURNING es.stream_ordering" - ) % (events_sql,) + ) - txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + min_stream_id = max_stream_id - batch_size + txn.execute(sql, (min_stream_id, max_stream_id)) rows = txn.fetchall() - min_stream_id = rows[-1][0] + + if min_stream_id < target_min_stream_id: + # We've recached the end. + return len(rows), False progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -203,16 +201,16 @@ class SearchStore(BackgroundUpdateStore): txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress ) - return len(rows) + return len(rows), True - result = yield self.runInteraction( + num_rows, finished = yield self.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn ) - if not result: + if not finished: yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME) - defer.returnValue(result) + defer.returnValue(num_rows) @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): -- cgit 1.4.1 From 9e7aa98c229af4f657756f9089654d2eab7a96ce Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 15:40:51 +0100 Subject: Split out create_resource_tree to a separate file --- synapse/app/homeserver.py | 89 +++--------------------------------- synapse/util/httpresourcetree.py | 98 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 82 deletions(-) create mode 100644 synapse/util/httpresourcetree.py (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d2085a9405..fdadffeba7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -66,6 +66,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse.util.httpresourcetree import create_resource_tree from synapse import events @@ -174,7 +175,12 @@ class SynapseHomeServer(HomeServer): if name == "replication": resources[REPLICATION_PREFIX] = ReplicationResource(self) - root_resource = create_resource_tree(resources) + if WEB_CLIENT_PREFIX in resources: + root_resource = RootRedirect(WEB_CLIENT_PREFIX) + else: + root_resource = Resource() + + root_resource = create_resource_tree(resources, root_resource) if tls: reactor.listenSSL( port, @@ -494,87 +500,6 @@ class SynapseSite(Site): pass -def create_resource_tree(desired_tree, redirect_root_to_web_client=True): - """Create the resource tree for this Home Server. - - This in unduly complicated because Twisted does not support putting - child resources more than 1 level deep at a time. - - Args: - web_client (bool): True to enable the web client. - redirect_root_to_web_client (bool): True to redirect '/' to the - location of the web client. This does nothing if web_client is not - True. - """ - if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree: - root_resource = RootRedirect(WEB_CLIENT_PREFIX) - else: - root_resource = Resource() - - # ideally we'd just use getChild and putChild but getChild doesn't work - # unless you give it a Request object IN ADDITION to the name :/ So - # instead, we'll store a copy of this mapping so we can actually add - # extra resources to existing nodes. See self._resource_id for the key. - resource_mappings = {} - for full_path, res in desired_tree.items(): - logger.info("Attaching %s to path %s", res, full_path) - last_resource = root_resource - for path_seg in full_path.split('/')[1:-1]: - if path_seg not in last_resource.listNames(): - # resource doesn't exist, so make a "dummy resource" - child_resource = Resource() - last_resource.putChild(path_seg, child_resource) - res_id = _resource_id(last_resource, path_seg) - resource_mappings[res_id] = child_resource - last_resource = child_resource - else: - # we have an existing Resource, use that instead. - res_id = _resource_id(last_resource, path_seg) - last_resource = resource_mappings[res_id] - - # =========================== - # now attach the actual desired resource - last_path_seg = full_path.split('/')[-1] - - # if there is already a resource here, thieve its children and - # replace it - res_id = _resource_id(last_resource, last_path_seg) - if res_id in resource_mappings: - # there is a dummy resource at this path already, which needs - # to be replaced with the desired resource. - existing_dummy_resource = resource_mappings[res_id] - for child_name in existing_dummy_resource.listNames(): - child_res_id = _resource_id( - existing_dummy_resource, child_name - ) - child_resource = resource_mappings[child_res_id] - # steal the children - res.putChild(child_name, child_resource) - - # finally, insert the desired resource in the right place - last_resource.putChild(last_path_seg, res) - res_id = _resource_id(last_resource, last_path_seg) - resource_mappings[res_id] = res - - return root_resource - - -def _resource_id(resource, path_seg): - """Construct an arbitrary resource ID so you can retrieve the mapping - later. - - If you want to represent resource A putChild resource B with path C, - the mapping should looks like _resource_id(A,C) = B. - - Args: - resource (Resource): The *parent* Resourceb - path_seg (str): The name of the child Resource to be attached. - Returns: - str: A unique string which can be a key to the child Resource. - """ - return "%s-%s" % (resource, path_seg) - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py new file mode 100644 index 0000000000..45be47159a --- /dev/null +++ b/synapse/util/httpresourcetree.py @@ -0,0 +1,98 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.web.resource import Resource + +import logging + +logger = logging.getLogger(__name__) + + +def create_resource_tree(desired_tree, root_resource): + """Create the resource tree for this Home Server. + + This in unduly complicated because Twisted does not support putting + child resources more than 1 level deep at a time. + + Args: + web_client (bool): True to enable the web client. + root_resource (twisted.web.resource.Resource): The root + resource to add the tree to. + Returns: + twisted.web.resource.Resource: the ``root_resource`` with a tree of + child resources added to it. + """ + + # ideally we'd just use getChild and putChild but getChild doesn't work + # unless you give it a Request object IN ADDITION to the name :/ So + # instead, we'll store a copy of this mapping so we can actually add + # extra resources to existing nodes. See self._resource_id for the key. + resource_mappings = {} + for full_path, res in desired_tree.items(): + logger.info("Attaching %s to path %s", res, full_path) + last_resource = root_resource + for path_seg in full_path.split('/')[1:-1]: + if path_seg not in last_resource.listNames(): + # resource doesn't exist, so make a "dummy resource" + child_resource = Resource() + last_resource.putChild(path_seg, child_resource) + res_id = _resource_id(last_resource, path_seg) + resource_mappings[res_id] = child_resource + last_resource = child_resource + else: + # we have an existing Resource, use that instead. + res_id = _resource_id(last_resource, path_seg) + last_resource = resource_mappings[res_id] + + # =========================== + # now attach the actual desired resource + last_path_seg = full_path.split('/')[-1] + + # if there is already a resource here, thieve its children and + # replace it + res_id = _resource_id(last_resource, last_path_seg) + if res_id in resource_mappings: + # there is a dummy resource at this path already, which needs + # to be replaced with the desired resource. + existing_dummy_resource = resource_mappings[res_id] + for child_name in existing_dummy_resource.listNames(): + child_res_id = _resource_id( + existing_dummy_resource, child_name + ) + child_resource = resource_mappings[child_res_id] + # steal the children + res.putChild(child_name, child_resource) + + # finally, insert the desired resource in the right place + last_resource.putChild(last_path_seg, res) + res_id = _resource_id(last_resource, last_path_seg) + resource_mappings[res_id] = res + + return root_resource + + +def _resource_id(resource, path_seg): + """Construct an arbitrary resource ID so you can retrieve the mapping + later. + + If you want to represent resource A putChild resource B with path C, + the mapping should looks like _resource_id(A,C) = B. + + Args: + resource (Resource): The *parent* Resourceb + path_seg (str): The name of the child Resource to be attached. + Returns: + str: A unique string which can be a key to the child Resource. + """ + return "%s-%s" % (resource, path_seg) -- cgit 1.4.1 From e856036f4c0b2744ef44a25f16b409ddb8c693e1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 16:09:55 +0100 Subject: Move SynapseSite to its own file --- synapse/app/homeserver.py | 133 +---------------------------------------- synapse/http/site.py | 146 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 130 deletions(-) create mode 100644 synapse/http/site.py (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d2085a9405..2818c55b7a 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,12 +16,9 @@ import synapse -import contextlib import logging import os -import re import sys -import time from synapse.config._base import ConfigError from synapse.python_dependencies import ( @@ -46,7 +43,7 @@ from twisted.internet import reactor, task, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File -from twisted.web.server import Site, GzipEncoderFactory, Request +from twisted.web.server import GzipEncoderFactory from synapse.http.server import RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource @@ -67,6 +64,8 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse.http.site import SynapseSite + from synapse import events from daemonize import Daemonize @@ -74,9 +73,6 @@ from daemonize import Daemonize logger = logging.getLogger("synapse.app.homeserver") -ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') - - def gz_wrap(r): return EncodingResourceWrapper(r, [GzipEncoderFactory()]) @@ -371,129 +367,6 @@ class SynapseService(service.Service): return self._port.stopListening() -class SynapseRequest(Request): - def __init__(self, site, *args, **kw): - Request.__init__(self, *args, **kw) - self.site = site - self.authenticated_entity = None - self.start_time = 0 - - def __repr__(self): - # We overwrite this so that we don't log ``access_token`` - return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( - self.__class__.__name__, - id(self), - self.method, - self.get_redacted_uri(), - self.clientproto, - self.site.site_tag, - ) - - def get_redacted_uri(self): - return ACCESS_TOKEN_RE.sub( - r'\1\3', - self.uri - ) - - def get_user_agent(self): - return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] - - def started_processing(self): - self.site.access_logger.info( - "%s - %s - Received request: %s %s", - self.getClientIP(), - self.site.site_tag, - self.method, - self.get_redacted_uri() - ) - self.start_time = int(time.time() * 1000) - - def finished_processing(self): - - try: - context = LoggingContext.current_context() - ru_utime, ru_stime = context.get_resource_usage() - db_txn_count = context.db_txn_count - db_txn_duration = context.db_txn_duration - except: - ru_utime, ru_stime = (0, 0) - db_txn_count, db_txn_duration = (0, 0) - - self.site.access_logger.info( - "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms) (%dms/%d)" - " %sB %s \"%s %s %s\" \"%s\"", - self.getClientIP(), - self.site.site_tag, - self.authenticated_entity, - int(time.time() * 1000) - self.start_time, - int(ru_utime * 1000), - int(ru_stime * 1000), - int(db_txn_duration * 1000), - int(db_txn_count), - self.sentLength, - self.code, - self.method, - self.get_redacted_uri(), - self.clientproto, - self.get_user_agent(), - ) - - @contextlib.contextmanager - def processing(self): - self.started_processing() - yield - self.finished_processing() - - -class XForwardedForRequest(SynapseRequest): - def __init__(self, *args, **kw): - SynapseRequest.__init__(self, *args, **kw) - - """ - Add a layer on top of another request that only uses the value of an - X-Forwarded-For header as the result of C{getClientIP}. - """ - def getClientIP(self): - """ - @return: The client address (the first address) in the value of the - I{X-Forwarded-For header}. If the header is not present, return - C{b"-"}. - """ - return self.requestHeaders.getRawHeaders( - b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() - - -class SynapseRequestFactory(object): - def __init__(self, site, x_forwarded_for): - self.site = site - self.x_forwarded_for = x_forwarded_for - - def __call__(self, *args, **kwargs): - if self.x_forwarded_for: - return XForwardedForRequest(self.site, *args, **kwargs) - else: - return SynapseRequest(self.site, *args, **kwargs) - - -class SynapseSite(Site): - """ - Subclass of a twisted http Site that does access logging with python's - standard logging - """ - def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): - Site.__init__(self, resource, *args, **kwargs) - - self.site_tag = site_tag - - proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(self, proxied) - self.access_logger = logging.getLogger(logger_name) - - def log(self, request): - pass - - def create_resource_tree(desired_tree, redirect_root_to_web_client=True): """Create the resource tree for this Home Server. diff --git a/synapse/http/site.py b/synapse/http/site.py new file mode 100644 index 0000000000..4b09d7ee66 --- /dev/null +++ b/synapse/http/site.py @@ -0,0 +1,146 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.logcontext import LoggingContext +from twisted.web.server import Site, Request + +import contextlib +import logging +import re +import time + +ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') + + +class SynapseRequest(Request): + def __init__(self, site, *args, **kw): + Request.__init__(self, *args, **kw) + self.site = site + self.authenticated_entity = None + self.start_time = 0 + + def __repr__(self): + # We overwrite this so that we don't log ``access_token`` + return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + self.__class__.__name__, + id(self), + self.method, + self.get_redacted_uri(), + self.clientproto, + self.site.site_tag, + ) + + def get_redacted_uri(self): + return ACCESS_TOKEN_RE.sub( + r'\1\3', + self.uri + ) + + def get_user_agent(self): + return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + + def started_processing(self): + self.site.access_logger.info( + "%s - %s - Received request: %s %s", + self.getClientIP(), + self.site.site_tag, + self.method, + self.get_redacted_uri() + ) + self.start_time = int(time.time() * 1000) + + def finished_processing(self): + + try: + context = LoggingContext.current_context() + ru_utime, ru_stime = context.get_resource_usage() + db_txn_count = context.db_txn_count + db_txn_duration = context.db_txn_duration + except: + ru_utime, ru_stime = (0, 0) + db_txn_count, db_txn_duration = (0, 0) + + self.site.access_logger.info( + "%s - %s - {%s}" + " Processed request: %dms (%dms, %dms) (%dms/%d)" + " %sB %s \"%s %s %s\" \"%s\"", + self.getClientIP(), + self.site.site_tag, + self.authenticated_entity, + int(time.time() * 1000) - self.start_time, + int(ru_utime * 1000), + int(ru_stime * 1000), + int(db_txn_duration * 1000), + int(db_txn_count), + self.sentLength, + self.code, + self.method, + self.get_redacted_uri(), + self.clientproto, + self.get_user_agent(), + ) + + @contextlib.contextmanager + def processing(self): + self.started_processing() + yield + self.finished_processing() + + +class XForwardedForRequest(SynapseRequest): + def __init__(self, *args, **kw): + SynapseRequest.__init__(self, *args, **kw) + + """ + Add a layer on top of another request that only uses the value of an + X-Forwarded-For header as the result of C{getClientIP}. + """ + def getClientIP(self): + """ + @return: The client address (the first address) in the value of the + I{X-Forwarded-For header}. If the header is not present, return + C{b"-"}. + """ + return self.requestHeaders.getRawHeaders( + b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() + + +class SynapseRequestFactory(object): + def __init__(self, site, x_forwarded_for): + self.site = site + self.x_forwarded_for = x_forwarded_for + + def __call__(self, *args, **kwargs): + if self.x_forwarded_for: + return XForwardedForRequest(self.site, *args, **kwargs) + else: + return SynapseRequest(self.site, *args, **kwargs) + + +class SynapseSite(Site): + """ + Subclass of a twisted http Site that does access logging with python's + standard logging + """ + def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): + Site.__init__(self, resource, *args, **kwargs) + + self.site_tag = site_tag + + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(self, proxied) + self.access_logger = logging.getLogger(logger_name) + + def log(self, request): + pass -- cgit 1.4.1 From 5905f36f0557f2b496e5b2759db295a3b2807574 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 17:08:02 +0100 Subject: Split out setting up the manhole to a separate file --- synapse/app/homeserver.py | 33 +++++++------------------------ synapse/util/manhole.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 26 deletions(-) create mode 100644 synapse/util/manhole.py (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1fa93be93e..b033073ef7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,13 +32,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d from synapse.server import HomeServer - -from twisted.conch.manhole import ColoredManhole -from twisted.conch.insults import insults -from twisted.conch import manhole_ssh -from twisted.cred import checkers, portal - - from twisted.internet import reactor, task, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper @@ -64,6 +57,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.manhole import listen_manhole from synapse.http.site import SynapseSite @@ -209,25 +203,12 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( - matrix="rabbithole" - ) - - rlm = manhole_ssh.TerminalRealm() - rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( - ColoredManhole, - { - "__name__": "__console__", - "hs": self, - } - ) - - f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - - reactor.listenTCP( - listener["port"], - f, - interface=listener.get("bind_address", '127.0.0.1') + listen_manhole( + bind_address=listener.get("bind_address", '127.0.0.1'), + bind_port=listener["port"], + username="matrix", + password="rabbithole", + globals={"hs": self}, ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py new file mode 100644 index 0000000000..e12583209f --- /dev/null +++ b/synapse/util/manhole.py @@ -0,0 +1,50 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.conch.manhole import ColoredManhole +from twisted.conch.insults import insults +from twisted.conch import manhole_ssh +from twisted.cred import checkers, portal + + +from twisted.internet import reactor + + +def listen_manhole(bind_address, bind_port, username, password, globals): + """Starts a ssh listener with password authentication using + the given username and password. Clients connecting to the ssh + listener will find themselves in a colored python shell with + the supplied globals. + + Args: + bind_address(str): IP address to listen on. + bind_port(int): TCP port to listen on. + username(str): The username ssh clients should auth with. + password(str): The password ssh clients should auth with. + globals(dict): The variables to expose in the shell. + """ + + checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( + **{username: password} + ) + + rlm = manhole_ssh.TerminalRealm() + rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( + ColoredManhole, + dict(globals, __name__="__console__") + ) + + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + + reactor.listenTCP(bind_port, factory, interface=bind_address) -- cgit 1.4.1 From 52ecbc2843de51b3685529e63cd3815e826b6d90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Apr 2016 14:30:15 +0100 Subject: Make pyjwt dependency optional --- synapse/config/jwt.py | 17 ++++++++++++++++- synapse/python_dependencies.py | 1 - synapse/rest/client/v1/login.py | 12 +++++++----- 3 files changed, 23 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/config/jwt.py b/synapse/config/jwt.py index 5c8199612b..47f145c589 100644 --- a/synapse/config/jwt.py +++ b/synapse/config/jwt.py @@ -13,7 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config +from ._base import Config, ConfigError + + +MISSING_JWT = ( + """Missing jwt library. This is required for jwt login. + + Install by running: + pip install pyjwt + """ +) class JWTConfig(Config): @@ -23,6 +32,12 @@ class JWTConfig(Config): self.jwt_enabled = jwt_config.get("enabled", False) self.jwt_secret = jwt_config["secret"] self.jwt_algorithm = jwt_config["algorithm"] + + try: + import jwt + jwt # To stop unused lint. + except ImportError: + raise ConfigError(MISSING_JWT) else: self.jwt_enabled = False self.jwt_secret = None diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index b25b736493..0eb3d6c1de 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,7 +36,6 @@ REQUIREMENTS = { "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], - "pyjwt": ["jwt"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 166a78026a..3b5544851b 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -33,9 +33,6 @@ from saml2.client import Saml2Client import xml.etree.ElementTree as ET -import jwt -from jwt.exceptions import InvalidTokenError - logger = logging.getLogger(__name__) @@ -226,8 +223,13 @@ class LoginRestServlet(ClientV1RestServlet): def do_jwt_login(self, login_submission): token = login_submission.get("token", None) if token is None: - raise LoginError(401, "Token field for JWT is missing", - errcode=Codes.UNAUTHORIZED) + raise LoginError( + 401, "Token field for JWT is missing", + errcode=Codes.UNAUTHORIZED + ) + + import jwt + from jwt.exceptions import InvalidTokenError try: payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm]) -- cgit 1.4.1 From f22f46f4f902e071fe322854a228f8fe53677cdc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 14:59:21 +0100 Subject: Move the listenTCP call outside the manhole function --- synapse/app/homeserver.py | 16 +++++++++------- synapse/util/manhole.py | 14 +++++--------- 2 files changed, 14 insertions(+), 16 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b033073ef7..df675c0ed4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -57,7 +57,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import listen_manhole +from synapse.util.manhole import manhole from synapse.http.site import SynapseSite @@ -203,12 +203,14 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - listen_manhole( - bind_address=listener.get("bind_address", '127.0.0.1'), - bind_port=listener["port"], - username="matrix", - password="rabbithole", - globals={"hs": self}, + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index e12583209f..9b106cdf47 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -18,21 +18,19 @@ from twisted.conch import manhole_ssh from twisted.cred import checkers, portal -from twisted.internet import reactor - - -def listen_manhole(bind_address, bind_port, username, password, globals): +def manhole(username, password, globals): """Starts a ssh listener with password authentication using the given username and password. Clients connecting to the ssh listener will find themselves in a colored python shell with the supplied globals. Args: - bind_address(str): IP address to listen on. - bind_port(int): TCP port to listen on. username(str): The username ssh clients should auth with. password(str): The password ssh clients should auth with. globals(dict): The variables to expose in the shell. + + Returns: + twisted.internet.protocol.Factory: A factory to pass to ``listenTCP`` """ checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( @@ -45,6 +43,4 @@ def listen_manhole(bind_address, bind_port, username, password, globals): dict(globals, __name__="__console__") ) - factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - - reactor.listenTCP(bind_port, factory, interface=bind_address) + return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) -- cgit 1.4.1 From 72e2fafa207c28581c62bcce2f1a6ede410fee5a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 17:34:25 +0100 Subject: Add a metrics listener and a ssh listener to the pusher --- synapse/app/pusher.py | 69 +++++++++++++++++++++++++++++++++++++++++++++++-- synapse/util/manhole.py | 26 ++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8922573db7..abb9f1fe8e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -17,19 +17,25 @@ import synapse from synapse.server import HomeServer -from synapse.util.versionstring import get_version_string from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep -from synapse.util.logcontext import (LoggingContext, preserve_fn) +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer +from twisted.web.resource import Resource import sys import logging @@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig): ) self.user_agent_suffix = None self.start_pushers = True + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") def default_config(self, **kwargs): return """\ ## Slave ## + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + listeners: [] + # Uncomment to enable a ssh manhole listener on the pusher. + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Uncomment to enable a metric listener on the pusher. + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"], + # compress: False + report_stats: False """ @@ -100,6 +122,46 @@ class PusherServer(HomeServer): }] }) + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse pusher now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks def replicate(self): http_client = self.get_simple_http_client() @@ -191,6 +253,9 @@ def setup(config_options): ) ps.setup() + ps.start_listening() + + change_resource_limit(ps.config.soft_file_limit) def start(): ps.replicate() diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 9b106cdf47..97e0f00b67 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -16,6 +16,26 @@ from twisted.conch.manhole import ColoredManhole from twisted.conch.insults import insults from twisted.conch import manhole_ssh from twisted.cred import checkers, portal +from twisted.conch.ssh.keys import Key + +PUBLIC_KEY = ( + "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az" + "64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS" + "kbh/C+BR3utDS555mV" +) + +PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW +4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw +vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb +Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1 +xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8 +PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2 +gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu +DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML +pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP +EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg== +-----END RSA PRIVATE KEY-----""" def manhole(username, password, globals): @@ -43,4 +63,8 @@ def manhole(username, password, globals): dict(globals, __name__="__console__") ) - return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY) + factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY) + + return factory -- cgit 1.4.1 From f15e9e8de4329ed7f726d3405c8aa37a52c8db24 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 17:56:24 +0100 Subject: Remove the uncomments from the comments --- synapse/app/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index abb9f1fe8e..e6aa0aa65c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -62,11 +62,11 @@ class SlaveConfig(DatabaseConfig): #replication_url: https://localhost:{replication_port}/_synapse/replication listeners: [] - # Uncomment to enable a ssh manhole listener on the pusher. + # Enable a ssh manhole listener on the pusher. # - type: manhole # port: {manhole_port} # bind_address: 127.0.0.1 - # Uncomment to enable a metric listener on the pusher. + # Enable a metric listener on the pusher. # - type: http # port: {metrics_port} # bind_address: 127.0.0.1 -- cgit 1.4.1 From 9c417c54d461be6877b58220311c7c1bb83dabb1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 10:45:02 +0100 Subject: Add a couple of update methods to the PusherSlaveStore --- synapse/app/pusher.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index e6aa0aa65c..9381fe2251 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -89,6 +89,14 @@ class PusherSlaveStore( DataStore.update_pusher_last_stream_ordering_and_success.__func__ ) + update_pusher_failing_since = ( + DataStore.update_pusher_failing_since.__func__ + ) + + update_pusher_last_stream_ordering = ( + DataStore.update_pusher_last_stream_ordering.__func__ + ) + class PusherServer(HomeServer): -- cgit 1.4.1 From 6df5a6a833620a6388c90850be4d457bc4c5c4eb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 15:37:41 +0100 Subject: Optionally daemonize the pusher --- synapse/app/pusher.py | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9381fe2251..5f3200cf4c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -37,6 +37,8 @@ from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer from twisted.web.resource import Resource +from daemonize import Daemonize + import sys import logging @@ -54,13 +56,19 @@ class SlaveConfig(DatabaseConfig): self.start_pushers = True self.listeners = config["listeners"] self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) - def default_config(self, **kwargs): + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("pusher.pid") return """\ - ## Slave ## + # Slave configuration + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + server_name: "%(server_name)s" + listeners: [] # Enable a ssh manhole listener on the pusher. # - type: manhole @@ -75,7 +83,12 @@ class SlaveConfig(DatabaseConfig): # compress: False report_stats: False - """ + + daemonize: False + + pid_file: %(pid_file)s + + """ % locals() class PusherSlaveConfig(SlaveConfig, LoggingConfig): @@ -248,6 +261,9 @@ def setup(config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) + if not config: + sys.exit(0) + config.setup_logging() database_engine = create_engine(config.database_config) @@ -278,4 +294,15 @@ def setup(config_options): if __name__ == '__main__': with LoggingContext("main"): ps = setup(sys.argv[1:]) - reactor.run() + + if ps.config.daemonize: + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=reactor.run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + else: + reactor.run() -- cgit 1.4.1 From b80b93ea0f47f9854bc093c72f4f0bd42898fabe Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 15:57:28 +0100 Subject: Add a log context to the daemonized pusher --- synapse/app/pusher.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 5f3200cf4c..a67650b5d1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -296,10 +296,15 @@ if __name__ == '__main__': ps = setup(sys.argv[1:]) if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + daemon = Daemonize( app="synapse-pusher", pid=ps.config.pid_file, - action=reactor.run, + action=run, auto_close_fds=False, verbose=True, logger=logger, -- cgit 1.4.1 From c9eab73f2a5c0e61ffd0de46d8bd4750f53e7ccb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 17:06:04 +0100 Subject: Fix typo in default pusher config --- synapse/app/pusher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index a67650b5d1..230156559e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -79,7 +79,7 @@ class SlaveConfig(DatabaseConfig): # port: {metrics_port} # bind_address: 127.0.0.1 # resources: - # - names: ["metrics"], + # - names: ["metrics"] # compress: False report_stats: False -- cgit 1.4.1 From 71df32719050892eae42cd741eaffc3dcf2eb603 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 17:07:09 +0100 Subject: Actually start the pusher daemon --- synapse/app/pusher.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 230156559e..b5339f030d 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -309,5 +309,7 @@ if __name__ == '__main__': verbose=True, logger=logger, ) + + daemon.start() else: reactor.run() -- cgit 1.4.1 From 871357d539bfbaf5552a098de2253600bf5f3a51 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 27 Apr 2016 11:54:13 +0100 Subject: Check that somethign has happend before running the selects --- synapse/storage/events.py | 10 ++++++++-- synapse/storage/pusher.py | 3 +++ synapse/storage/receipts.py | 3 +++ 3 files changed, 14 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 21487724ed..0307b2af3c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1143,6 +1143,12 @@ class EventsStore(SQLBaseStore): current_backfill_id, current_forward_id, limit): """Get all the new events that have arrived at the server either as new events or as backfilled events""" + have_backfill_events = last_backfill_id != current_backfill_id + have_forward_events = last_forward_id != current_forward_id + + if not have_backfill_events and not have_forward_events: + return defer.succeed(AllNewEventsResult([], [], [], [], [])) + def get_all_new_events_txn(txn): sql = ( "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group" @@ -1155,7 +1161,7 @@ class EventsStore(SQLBaseStore): " ORDER BY e.stream_ordering ASC" " LIMIT ?" ) - if last_forward_id != current_forward_id: + if have_forward_events: txn.execute(sql, (last_forward_id, current_forward_id, limit)) new_forward_events = txn.fetchall() @@ -1199,7 +1205,7 @@ class EventsStore(SQLBaseStore): " ORDER BY e.stream_ordering DESC" " LIMIT ?" ) - if last_backfill_id != current_backfill_id: + if have_backfill_events: txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit)) new_backfill_events = txn.fetchall() diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index e5755c0aea..11feb3eb11 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -106,6 +106,9 @@ class PusherStore(SQLBaseStore): return self._pushers_id_gen.get_current_token() def get_all_updated_pushers(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed(([], [])) + def get_all_updated_pushers_txn(txn): sql = ( "SELECT id, user_name, access_token, profile_tag, kind," diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 3b8805593e..935fc503d9 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -391,6 +391,9 @@ class ReceiptsStore(SQLBaseStore): ) def get_all_updated_receipts(self, last_id, current_id, limit=None): + if last_id == current_id: + return defer.succeed([]) + def get_all_updated_receipts_txn(txn): sql = ( "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" -- cgit 1.4.1 From 8a656664544fbc23db618aa855cc61ac54d9afeb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 27 Apr 2016 15:38:43 +0100 Subject: Fix backfill replication to advance the stream correctly --- synapse/replication/resource.py | 2 +- synapse/replication/slave/storage/events.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index e5c9a53929..149fc4c650 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -382,7 +382,7 @@ class _Writer(object): position = rows[-1][0] self.streams[name] = { - "position": str(position), + "position": position if type(position) is int else str(position), "field_names": fields, "rows": rows, } diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5f37ba6995..86f00b6ff5 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -118,7 +118,7 @@ class SlavedEventStore(BaseSlavedStore): def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() - result["backfill"] = self._backfill_id_gen.get_current_token() + result["backfill"] = -self._backfill_id_gen.get_current_token() return result def process_replication(self, result): @@ -136,7 +136,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("backfill") if stream: - self._backfill_id_gen.advance(stream["position"]) + self._backfill_id_gen.advance(-stream["position"]) for row in stream["rows"]: self._process_replication_row( row, backfilled=True, state_resets=state_resets -- cgit 1.4.1 From 8d7ad44331d7eff4a140b1e4777532d8a3fb26cb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Apr 2016 10:57:49 +0100 Subject: Report per request metrics for all of the things using request_handler --- synapse/http/server.py | 101 ++++++++++++++++---------- synapse/replication/pusher_resource.py | 3 +- synapse/replication/resource.py | 3 +- synapse/rest/key/v1/server_key_resource.py | 1 - synapse/rest/key/v2/remote_key_resource.py | 4 +- synapse/rest/media/v1/download_resource.py | 3 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/rest/media/v1/thumbnail_resource.py | 3 +- synapse/rest/media/v1/upload_resource.py | 3 +- 9 files changed, 76 insertions(+), 47 deletions(-) (limited to 'synapse') diff --git a/synapse/http/server.py b/synapse/http/server.py index b82196fd5e..d4d639f617 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -74,7 +74,12 @@ response_db_txn_duration = metrics.register_distribution( _next_request_id = 0 -def request_handler(request_handler): +def request_handler(report_metrics=True): + """Decorator for ``wrap_request_handler``""" + return lambda request_handler: wrap_request_handler(request_handler, report_metrics) + + +def wrap_request_handler(request_handler, report_metrics): """Wraps a method that acts as a request handler with the necessary logging and exception handling. @@ -96,6 +101,10 @@ def request_handler(request_handler): global _next_request_id request_id = "%s-%s" % (request.method, _next_request_id) _next_request_id += 1 + if report_metrics: + request_metrics = RequestMetrics() + request_metrics.start(self.clock) + with LoggingContext(request_id) as request_context: request_context.request = request_id with request.processing(): @@ -133,6 +142,13 @@ def request_handler(request_handler): }, send_cors=True ) + finally: + try: + request_metrics.stop( + self.clock, request, self.__class__.__name__ + ) + except: + pass return wrapped_request_handler @@ -197,19 +213,19 @@ class JsonResource(HttpServer, resource.Resource): self._async_render(request) return server.NOT_DONE_YET - @request_handler + @request_handler(report_metrics=False) @defer.inlineCallbacks def _async_render(self, request): """ This gets called from render() every time someone sends us a request. This checks if anyone has registered a callback for that method and path. """ - start = self.clock.time_msec() if request.method == "OPTIONS": self._send_response(request, 200, {}) return - start_context = LoggingContext.current_context() + request_metrics = RequestMetrics() + request_metrics.start(self.clock) # Loop through all the registered callbacks to check if the method # and path regex match @@ -241,40 +257,7 @@ class JsonResource(HttpServer, resource.Resource): self._send_response(request, code, response) try: - context = LoggingContext.current_context() - - tag = "" - if context: - tag = context.tag - - if context != start_context: - logger.warn( - "Context have unexpectedly changed %r, %r", - context, self.start_context - ) - return - - incoming_requests_counter.inc(request.method, servlet_classname, tag) - - response_timer.inc_by( - self.clock.time_msec() - start, request.method, - servlet_classname, tag - ) - - ru_utime, ru_stime = context.get_resource_usage() - - response_ru_utime.inc_by( - ru_utime, request.method, servlet_classname, tag - ) - response_ru_stime.inc_by( - ru_stime, request.method, servlet_classname, tag - ) - response_db_txn_count.inc_by( - context.db_txn_count, request.method, servlet_classname, tag - ) - response_db_txn_duration.inc_by( - context.db_txn_duration, request.method, servlet_classname, tag - ) + request_metrics.stop(self.clock, request, servlet_classname) except: pass @@ -307,6 +290,48 @@ class JsonResource(HttpServer, resource.Resource): ) +class RequestMetrics(object): + def start(self, clock): + self.start = clock.time_msec() + self.start_context = LoggingContext.current_context() + + def stop(self, clock, request, servlet_classname): + context = LoggingContext.current_context() + + tag = "" + if context: + tag = context.tag + + if context != start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + + incoming_requests_counter.inc(request.method, servlet_classname, tag) + + response_timer.inc_by( + self.clock.time_msec() - start, request.method, + servlet_classname, tag + ) + + ru_utime, ru_stime = context.get_resource_usage() + + response_ru_utime.inc_by( + ru_utime, request.method, servlet_classname, tag + ) + response_ru_stime.inc_by( + ru_stime, request.method, servlet_classname, tag + ) + response_db_txn_count.inc_by( + context.db_txn_count, request.method, servlet_classname, tag + ) + response_db_txn_duration.inc_by( + context.db_txn_duration, request.method, servlet_classname, tag + ) + + class RootRedirect(resource.Resource): """Redirects the root '/' path to another path.""" diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py index b87026d79a..9b01ab3c13 100644 --- a/synapse/replication/pusher_resource.py +++ b/synapse/replication/pusher_resource.py @@ -31,12 +31,13 @@ class PusherResource(Resource): self.version_string = hs.version_string self.store = hs.get_datastore() self.notifier = hs.get_notifier() + self.clock = hs.get_clock() def render_POST(self, request): self._async_render_POST(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_POST(self, request): content = parse_json_object_from_request(request) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 149fc4c650..ff78c60f13 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -112,6 +112,7 @@ class ReplicationResource(Resource): self.presence_handler = hs.get_handlers().presence_handler self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier + self.clock = hs.get_clock() self.putChild("remove_pushers", PusherResource(hs)) @@ -139,7 +140,7 @@ class ReplicationResource(Resource): state_token, )) - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): limit = parse_integer(request, "limit", 100) diff --git a/synapse/rest/key/v1/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py index 3db3838b7e..bd4fea5774 100644 --- a/synapse/rest/key/v1/server_key_resource.py +++ b/synapse/rest/key/v1/server_key_resource.py @@ -49,7 +49,6 @@ class LocalKey(Resource): """ def __init__(self, hs): - self.hs = hs self.version_string = hs.version_string self.response_body = encode_canonical_json( self.response_json_object(hs.config) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 9552016fec..7209d5a37d 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -97,7 +97,7 @@ class RemoteKey(Resource): self.async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def async_render_GET(self, request): if len(request.postpath) == 1: @@ -122,7 +122,7 @@ class RemoteKey(Resource): self.async_render_POST(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def async_render_POST(self, request): content = parse_json_object_from_request(request) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 510884262c..9f69620772 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -36,12 +36,13 @@ class DownloadResource(Resource): self.server_name = hs.hostname self.store = hs.get_datastore() self.version_string = hs.version_string + self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): server_name, media_id, name = parse_media_id(request) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 69327ac493..dc1e5fbdb3 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -74,7 +74,7 @@ class PreviewUrlResource(Resource): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 234dd4261c..0b9e1de1a7 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -39,12 +39,13 @@ class ThumbnailResource(Resource): self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.server_name = hs.hostname self.version_string = hs.version_string + self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): server_name, media_id, _ = parse_media_id(request) diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 299e1f6e56..b716d1d892 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -41,6 +41,7 @@ class UploadResource(Resource): self.auth = hs.get_auth() self.max_upload_size = hs.config.max_upload_size self.version_string = hs.version_string + self.clock = hs.get_clock() def render_POST(self, request): self._async_render_POST(request) @@ -50,7 +51,7 @@ class UploadResource(Resource): respond_with_json(request, 200, {}, send_cors=True) return NOT_DONE_YET - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_POST(self, request): requester = yield self.auth.get_user_by_req(request) -- cgit 1.4.1 From 6037349512d7fee303adad908b01d0a990715833 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Apr 2016 12:25:48 +0100 Subject: Check if report_metrics is True --- synapse/http/server.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/http/server.py b/synapse/http/server.py index d4d639f617..2c131a7017 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -144,9 +144,10 @@ def wrap_request_handler(request_handler, report_metrics): ) finally: try: - request_metrics.stop( - self.clock, request, self.__class__.__name__ - ) + if report_metrics: + request_metrics.stop( + self.clock, request, self.__class__.__name__ + ) except: pass return wrapped_request_handler -- cgit 1.4.1 From 1a12766e3bfe76681af9ef88cbb6fbf22bb50500 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Apr 2016 12:31:26 +0100 Subject: Add a comment explaining why automatic metric reporting is disabled for JsonResource --- synapse/http/server.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse') diff --git a/synapse/http/server.py b/synapse/http/server.py index 2c131a7017..29241fa145 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -214,6 +214,10 @@ class JsonResource(HttpServer, resource.Resource): self._async_render(request) return server.NOT_DONE_YET + # Disable metric reporting because _async_render does its own metrics. + # It does its own metric reporting because _async_render dispatches to + # a callback and it's the class name of that callback we want to report + # against rather than the JsonResource itself. @request_handler(report_metrics=False) @defer.inlineCallbacks def _async_render(self, request): -- cgit 1.4.1 From aebd0c97179adb867401ce8b8e558758e7b87243 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 28 Apr 2016 15:09:11 +0100 Subject: fix typo --- synapse/handlers/room_member.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b69f36aefe..ed2cda837f 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -232,7 +232,7 @@ class RoomMemberHandler(BaseHandler): if old_membership == "ban" and action != "unban": raise SynapseError( 403, - "Cannot %s user who was is banned" % (action,), + "Cannot %s user who was banned" % (action,), errcode=Codes.BAD_STATE ) -- cgit 1.4.1 From dcfc10b12995f4f3f5d751093f3be04e15c66f65 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Apr 2016 15:10:14 +0100 Subject: Fix typo in request metrics --- synapse/http/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/http/server.py b/synapse/http/server.py index 29241fa145..35ec01678f 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -317,7 +317,7 @@ class RequestMetrics(object): incoming_requests_counter.inc(request.method, servlet_classname, tag) response_timer.inc_by( - self.clock.time_msec() - start, request.method, + clock.time_msec() - self.start, request.method, servlet_classname, tag ) -- cgit 1.4.1 From 351b50a887add49c5233a15e6d6a2e994b7d2e9b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Apr 2016 15:29:46 +0100 Subject: Fix more typos in per-request metrics --- synapse/http/server.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/http/server.py b/synapse/http/server.py index 35ec01678f..f705abab94 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -101,11 +101,12 @@ def wrap_request_handler(request_handler, report_metrics): global _next_request_id request_id = "%s-%s" % (request.method, _next_request_id) _next_request_id += 1 - if report_metrics: - request_metrics = RequestMetrics() - request_metrics.start(self.clock) with LoggingContext(request_id) as request_context: + if report_metrics: + request_metrics = RequestMetrics() + request_metrics.start(self.clock) + request_context.request = request_id with request.processing(): try: @@ -307,7 +308,7 @@ class RequestMetrics(object): if context: tag = context.tag - if context != start_context: + if context != self.start_context: logger.warn( "Context have unexpectedly changed %r, %r", context, self.start_context -- cgit 1.4.1