From 650f0e69f2ff1c15739868c0e1a639d70ac13dbf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Mar 2017 13:03:50 +0100 Subject: Compile the regex's used in ASes --- tests/appservice/test_appservice.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'tests') diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index aa8cc50550..7586ea9053 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -19,10 +19,12 @@ from twisted.internet import defer from mock import Mock from tests import unittest +import re + def _regex(regex, exclusive=True): return { - "regex": regex, + "regex": re.compile(regex), "exclusive": exclusive } -- cgit 1.4.1 From eefd9fee81428ecec311999980bb4213b6aac2df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Mar 2017 11:19:15 +0100 Subject: Fix up tests --- tests/storage/test__base.py | 2 +- tests/util/caches/test_descriptors.py | 38 +++++++++++++++++++++++++++++++++++ tests/util/test_snapshot_cache.py | 4 +++- 3 files changed, 42 insertions(+), 2 deletions(-) (limited to 'tests') diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 8361dd8cee..281eb16254 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -199,7 +199,7 @@ class CacheDecoratorTestCase(unittest.TestCase): a.func.prefill(("foo",), ObservableDeferred(d)) - self.assertEquals(a.func("foo").result, d.result) + self.assertEquals(a.func("foo"), d.result) self.assertEquals(callcount[0], 0) @defer.inlineCallbacks diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 4414e86771..3f14ab503f 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -175,3 +175,41 @@ class DescriptorTestCase(unittest.TestCase): logcontext.LoggingContext.sentinel) return d1 + + @defer.inlineCallbacks + def test_cache_default_args(self): + class Cls(object): + def __init__(self): + self.mock = mock.Mock() + + @descriptors.cached() + def fn(self, arg1, arg2=2, arg3=3): + return self.mock(arg1, arg2, arg3) + + obj = Cls() + + obj.mock.return_value = 'fish' + r = yield obj.fn(1, 2, 3) + self.assertEqual(r, 'fish') + obj.mock.assert_called_once_with(1, 2, 3) + obj.mock.reset_mock() + + # a call with same params shouldn't call the mock again + r = yield obj.fn(1, 2) + self.assertEqual(r, 'fish') + obj.mock.assert_not_called() + obj.mock.reset_mock() + + # a call with different params should call the mock again + obj.mock.return_value = 'chips' + r = yield obj.fn(2, 3) + self.assertEqual(r, 'chips') + obj.mock.assert_called_once_with(2, 3, 3) + obj.mock.reset_mock() + + # the two values should now be cached + r = yield obj.fn(1, 2) + self.assertEqual(r, 'fish') + r = yield obj.fn(2, 3) + self.assertEqual(r, 'chips') + obj.mock.assert_not_called() diff --git a/tests/util/test_snapshot_cache.py b/tests/util/test_snapshot_cache.py index 7e289715ba..d3a8630c2f 100644 --- a/tests/util/test_snapshot_cache.py +++ b/tests/util/test_snapshot_cache.py @@ -53,7 +53,9 @@ class SnapshotCacheTestCase(unittest.TestCase): # before the cache expires returns a resolved deferred. get_result_at_11 = self.cache.get(11, "key") self.assertIsNotNone(get_result_at_11) - self.assertTrue(get_result_at_11.called) + if isinstance(get_result_at_11, Deferred): + # The cache may return the actual result rather than a deferred + self.assertTrue(get_result_at_11.called) # Check that getting the key after the deferred has resolved # after the cache expires returns None -- cgit 1.4.1 From 3a1f3f838862cfd2486773079d01e30c2237e8c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 16:32:07 +0100 Subject: Change slave storage to use new replication interface As the TCP replication uses a slightly different API and streams than the HTTP replication. This breaks HTTP replication. --- synapse/replication/slave/storage/_base.py | 31 +++--------- synapse/replication/slave/storage/account_data.py | 49 +++++++------------ synapse/replication/slave/storage/deviceinbox.py | 23 ++++----- synapse/replication/slave/storage/devices.py | 24 ++++------ synapse/replication/slave/storage/events.py | 57 +++++++---------------- synapse/replication/slave/storage/presence.py | 19 ++++---- synapse/replication/slave/storage/push_rule.py | 23 ++++----- synapse/replication/slave/storage/pushers.py | 16 +++---- synapse/replication/slave/storage/receipts.py | 24 +++++----- synapse/replication/slave/storage/room.py | 11 +++-- tests/replication/slave/storage/_base.py | 30 ++++++++---- 11 files changed, 128 insertions(+), 179 deletions(-) (limited to 'tests') diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index ab133db872..b962641166 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -15,7 +15,6 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.engines import PostgresEngine -from twisted.internet import defer from ._slaved_id_tracker import SlavedIdTracker @@ -34,8 +33,7 @@ class BaseSlavedStore(SQLBaseStore): else: self._cache_id_gen = None - self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache" - self.http_client = hs.get_simple_http_client() + self.hs = hs def stream_positions(self): pos = {} @@ -43,35 +41,20 @@ class BaseSlavedStore(SQLBaseStore): pos["caches"] = self._cache_id_gen.get_current_token() return pos - def process_replication(self, result): - stream = result.get("caches") - if stream: - for row in stream["rows"]: - ( - position, cache_func, keys, invalidation_ts, - ) = row - + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "caches": + self._cache_id_gen.advance(token) + for row in rows: try: - getattr(self, cache_func).invalidate(tuple(keys)) + getattr(self, row.cache_func).invalidate(tuple(row.keys)) except AttributeError: # We probably haven't pulled in the cache in this worker, # which is fine. pass - self._cache_id_gen.advance(int(stream["position"])) - return defer.succeed(None) def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) txn.call_after(self._send_invalidation_poke, cache_func, keys) - @defer.inlineCallbacks def _send_invalidation_poke(self, cache_func, keys): - try: - yield self.http_client.post_json_get_json(self.expire_cache_url, { - "invalidate": [{ - "name": cache_func.__name__, - "keys": list(keys), - }] - }) - except: - logger.exception("Failed to poke on expire_cache") + self.hs.get_tcp_replication().send_invalidate_cache(cache_func, keys) diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 77c64722c7..efbd87918e 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -69,38 +69,25 @@ class SlavedAccountDataStore(BaseSlavedStore): result["tag_account_data"] = position return result - def process_replication(self, result): - stream = result.get("user_account_data") - if stream: - self._account_data_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id, data_type = row[:3] - self.get_global_account_data_by_type_for_user.invalidate( - (data_type, user_id,) - ) - self.get_account_data_for_user.invalidate((user_id,)) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "tag_account_data": + self._account_data_id_gen.advance(token) + for row in rows: + self.get_tags_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - - stream = result.get("room_account_data") - if stream: - self._account_data_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id = row[:2] - self.get_account_data_for_user.invalidate((user_id,)) + elif stream_name == "account_data": + self._account_data_id_gen.advance(token) + for row in rows: + if not row.room_id: + self.get_global_account_data_by_type_for_user.invalidate( + (row.data_type, row.user_id,) + ) + self.get_account_data_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - - stream = result.get("tag_account_data") - if stream: - self._account_data_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id = row[:2] - self.get_tags_for_user.invalidate((user_id,)) - self._account_data_stream_cache.entity_has_changed( - user_id, position - ) - - return super(SlavedAccountDataStore, self).process_replication(result) + return super(SlavedAccountDataStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index f9102e0d89..6f3fb64770 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -53,21 +53,18 @@ class SlavedDeviceInboxStore(BaseSlavedStore): result["to_device"] = self._device_inbox_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("to_device") - if stream: - self._device_inbox_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - stream_id = row[0] - entity = row[1] - - if entity.startswith("@"): + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "to_device": + self._device_inbox_id_gen.advance(token) + for row in rows: + if row.entity.startswith("@"): self._device_inbox_stream_cache.entity_has_changed( - entity, stream_id + row.entity, token ) else: self._device_federation_outbox_stream_cache.entity_has_changed( - entity, stream_id + row.entity, token ) - - return super(SlavedDeviceInboxStore, self).process_replication(result) + return super(SlavedDeviceInboxStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index ca46aa17b6..4d4a435471 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -51,22 +51,18 @@ class SlavedDeviceStore(BaseSlavedStore): result["device_lists"] = self._device_list_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("device_lists") - if stream: - self._device_list_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - stream_id = row[0] - user_id = row[1] - destination = row[2] - + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "device_lists": + self._device_list_id_gen.advance(token) + for row in rows: self._device_list_stream_cache.entity_has_changed( - user_id, stream_id + row.user_id, token ) - if destination: + if row.destination: self._device_list_federation_stream_cache.entity_has_changed( - destination, stream_id + row.destination, token ) - - return super(SlavedDeviceStore, self).process_replication(result) + return super(SlavedDeviceStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index d4db1e452e..5fd47706ef 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -201,48 +201,25 @@ class SlavedEventStore(BaseSlavedStore): result["backfill"] = -self._backfill_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("events") - if stream: - self._stream_id_gen.advance(int(stream["position"])) - - if stream["rows"]: - logger.info("Got %d event rows", len(stream["rows"])) - - for row in stream["rows"]: - self._process_replication_row( - row, backfilled=False, + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "events": + self._stream_id_gen.advance(token) + for row in rows: + self.invalidate_caches_for_event( + token, row.event_id, row.room_id, row.type, row.state_key, + row.redacts, + backfilled=False, ) - - stream = result.get("backfill") - if stream: - self._backfill_id_gen.advance(-int(stream["position"])) - for row in stream["rows"]: - self._process_replication_row( - row, backfilled=True, + elif stream_name == "backfill": + self._backfill_id_gen.advance(-token) + for row in rows: + self.invalidate_caches_for_event( + -token, row.event_id, row.room_id, row.type, row.state_key, + row.redacts, + backfilled=True, ) - - stream = result.get("forward_ex_outliers") - if stream: - self._stream_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - event_id = row[1] - self._invalidate_get_event_cache(event_id) - - stream = result.get("backward_ex_outliers") - if stream: - self._backfill_id_gen.advance(-int(stream["position"])) - for row in stream["rows"]: - event_id = row[1] - self._invalidate_get_event_cache(event_id) - - return super(SlavedEventStore, self).process_replication(result) - - def _process_replication_row(self, row, backfilled): - stream_ordering = row[0] if not backfilled else -row[0] - self.invalidate_caches_for_event( - stream_ordering, row[1], row[2], row[3], row[4], row[5], - backfilled=backfilled, + return super(SlavedEventStore, self).process_replication_rows( + stream_name, token, rows ) def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index e4a2414d78..dffc80adc3 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -48,15 +48,14 @@ class SlavedPresenceStore(BaseSlavedStore): result["presence"] = position return result - def process_replication(self, result): - stream = result.get("presence") - if stream: - self._presence_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id = row[:2] + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "presence": + self._presence_id_gen.advance(token) + for row in rows: self.presence_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - self._get_presence_for_user.invalidate((user_id,)) - - return super(SlavedPresenceStore, self).process_replication(result) + self._get_presence_for_user.invalidate((row.user_id,)) + return super(SlavedPresenceStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 21ceb0213a..83e880fdd2 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -50,18 +50,15 @@ class SlavedPushRuleStore(SlavedEventStore): result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("push_rules") - if stream: - for row in stream["rows"]: - position = row[0] - user_id = row[2] - self.get_push_rules_for_user.invalidate((user_id,)) - self.get_push_rules_enabled_for_user.invalidate((user_id,)) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "push_rules": + self._push_rules_stream_id_gen.advance(token) + for row in rows: + self.get_push_rules_for_user.invalidate((row.user_id,)) + self.get_push_rules_enabled_for_user.invalidate((row.user_id,)) self.push_rules_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - - self._push_rules_stream_id_gen.advance(int(stream["position"])) - - return super(SlavedPushRuleStore, self).process_replication(result) + return super(SlavedPushRuleStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index d88206b3bb..4e8d68ece9 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -40,13 +40,9 @@ class SlavedPusherStore(BaseSlavedStore): result["pushers"] = self._pushers_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("pushers") - if stream: - self._pushers_id_gen.advance(int(stream["position"])) - - stream = result.get("deleted_pushers") - if stream: - self._pushers_id_gen.advance(int(stream["position"])) - - return super(SlavedPusherStore, self).process_replication(result) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "pushers": + self._pushers_id_gen.advance(token) + return super(SlavedPusherStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index ac9662d399..b371574ece 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -65,20 +65,22 @@ class SlavedReceiptsStore(BaseSlavedStore): result["receipts"] = self._receipts_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("receipts") - if stream: - self._receipts_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, room_id, receipt_type, user_id = row[:4] - self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) - self._receipts_stream_cache.entity_has_changed(room_id, position) - - return super(SlavedReceiptsStore, self).process_replication(result) - def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) self.get_linearized_receipts_for_room.invalidate_many((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "receipts": + self._receipts_id_gen.advance(token) + for row in rows: + self.invalidate_caches_for_receipt( + row.room_id, row.receipt_type, row.user_id + ) + self._receipts_stream_cache.entity_has_changed(row.room_id, token) + + return super(SlavedReceiptsStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index 6df9a25ef3..f510384033 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -46,9 +46,10 @@ class RoomStore(BaseSlavedStore): result["public_rooms"] = self._public_room_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("public_rooms") - if stream: - self._public_room_id_gen.advance(int(stream["position"])) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "public_rooms": + self._public_room_id_gen.advance(token) - return super(RoomStore, self).process_replication(result) + return super(RoomStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index b82868054d..81063f19a1 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor from tests import unittest from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver -from synapse.replication.resource import ReplicationResource +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory +from synapse.replication.tcp.client import ( + ReplicationClientHandler, ReplicationClientFactory, +) class BaseSlavedStoreTestCase(unittest.TestCase): @@ -33,18 +36,29 @@ class BaseSlavedStoreTestCase(unittest.TestCase): ) self.hs.get_ratelimiter().send_message.return_value = (True, 0) - self.replication = ReplicationResource(self.hs) - self.master_store = self.hs.get_datastore() self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.event_id = 0 + server_factory = ReplicationStreamProtocolFactory(self.hs) + listener = reactor.listenUNIX("\0xxx", server_factory) + self.addCleanup(listener.stopListening) + self.streamer = server_factory.streamer + + self.replication_handler = ReplicationClientHandler(self.slaved_store) + client_factory = ReplicationClientFactory( + self.hs, "client_name", self.replication_handler + ) + client_connector = reactor.connectUNIX("\0xxx", client_factory) + self.addCleanup(client_factory.stopTrying) + self.addCleanup(client_connector.disconnect) + @defer.inlineCallbacks def replicate(self): - streams = self.slaved_store.stream_positions() - writer = yield self.replication.replicate(streams, 100) - result = writer.finish() - yield self.slaved_store.process_replication(result) + yield self.streamer.on_notifier_poke() + d = self.replication_handler.await_sync("replication_test") + self.streamer.send_sync_to_all_connections("replication_test") + yield d @defer.inlineCallbacks def check(self, method, args, expected_result=None): -- cgit 1.4.1 From 85a0d6c7ab915d2848d1899e6af9bd489d456116 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 10:59:27 +0100 Subject: Remove test of replication resource --- tests/replication/test_resource.py | 204 ------------------------------------- 1 file changed, 204 deletions(-) delete mode 100644 tests/replication/test_resource.py (limited to 'tests') diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py deleted file mode 100644 index 429b37e360..0000000000 --- a/tests/replication/test_resource.py +++ /dev/null @@ -1,204 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import contextlib -import json - -from mock import Mock, NonCallableMock -from twisted.internet import defer - -import synapse.types -from synapse.replication.resource import ReplicationResource -from synapse.types import UserID -from tests import unittest -from tests.utils import setup_test_homeserver - - -class ReplicationResourceCase(unittest.TestCase): - @defer.inlineCallbacks - def setUp(self): - self.hs = yield setup_test_homeserver( - "red", - http_client=None, - replication_layer=Mock(), - ratelimiter=NonCallableMock(spec_set=[ - "send_message", - ]), - ) - self.user_id = "@seeing:red" - self.user = UserID.from_string(self.user_id) - - self.hs.get_ratelimiter().send_message.return_value = (True, 0) - - self.resource = ReplicationResource(self.hs) - - @defer.inlineCallbacks - def test_streams(self): - # Passing "-1" returns the current stream positions - code, body = yield self.get(streams="-1") - self.assertEquals(code, 200) - self.assertEquals(body["streams"]["field_names"], ["name", "position"]) - position = body["streams"]["position"] - # Passing the current position returns an empty response after the - # timeout - get = self.get(streams=str(position), timeout="0") - self.hs.clock.advance_time_msec(1) - code, body = yield get - self.assertEquals(code, 200) - self.assertEquals(body, {}) - - @defer.inlineCallbacks - def test_events(self): - get = self.get(events="-1", timeout="0") - yield self.hs.get_handlers().room_creation_handler.create_room( - synapse.types.create_requester(self.user), {} - ) - code, body = yield get - self.assertEquals(code, 200) - self.assertEquals(body["events"]["field_names"], [ - "position", "event_id", "room_id", "type", "state_key", - ]) - - @defer.inlineCallbacks - def test_presence(self): - get = self.get(presence="-1") - yield self.hs.get_presence_handler().set_state( - self.user, {"presence": "online"} - ) - code, body = yield get - self.assertEquals(code, 200) - self.assertEquals(body["presence"]["field_names"], [ - "position", "user_id", "state", "last_active_ts", - "last_federation_update_ts", "last_user_sync_ts", - "status_msg", "currently_active", - ]) - - @defer.inlineCallbacks - def test_typing(self): - room_id = yield self.create_room() - get = self.get(typing="-1") - yield self.hs.get_typing_handler().started_typing( - self.user, self.user, room_id, timeout=2 - ) - code, body = yield get - self.assertEquals(code, 200) - self.assertEquals(body["typing"]["field_names"], [ - "position", "room_id", "typing" - ]) - - @defer.inlineCallbacks - def test_receipts(self): - room_id = yield self.create_room() - event_id = yield self.send_text_message(room_id, "Hello, World") - get = self.get(receipts="-1") - yield self.hs.get_receipts_handler().received_client_receipt( - room_id, "m.read", self.user_id, event_id - ) - code, body = yield get - self.assertEquals(code, 200) - self.assertEquals(body["receipts"]["field_names"], [ - "position", "room_id", "receipt_type", "user_id", "event_id", "data" - ]) - - def _test_timeout(stream): - """Check that a request for the given stream timesout""" - @defer.inlineCallbacks - def test_timeout(self): - get = self.get(**{stream: "-1", "timeout": "0"}) - self.hs.clock.advance_time_msec(1) - code, body = yield get - self.assertEquals(code, 200) - self.assertEquals(body.get("rows", []), []) - test_timeout.__name__ = "test_timeout_%s" % (stream) - return test_timeout - - test_timeout_events = _test_timeout("events") - test_timeout_presence = _test_timeout("presence") - test_timeout_typing = _test_timeout("typing") - test_timeout_receipts = _test_timeout("receipts") - test_timeout_user_account_data = _test_timeout("user_account_data") - test_timeout_room_account_data = _test_timeout("room_account_data") - test_timeout_tag_account_data = _test_timeout("tag_account_data") - test_timeout_backfill = _test_timeout("backfill") - test_timeout_push_rules = _test_timeout("push_rules") - test_timeout_pushers = _test_timeout("pushers") - test_timeout_state = _test_timeout("state") - - @defer.inlineCallbacks - def send_text_message(self, room_id, message): - handler = self.hs.get_handlers().message_handler - event = yield handler.create_and_send_nonmember_event( - synapse.types.create_requester(self.user), - { - "type": "m.room.message", - "content": {"body": "message", "msgtype": "m.text"}, - "room_id": room_id, - "sender": self.user.to_string(), - } - ) - defer.returnValue(event.event_id) - - @defer.inlineCallbacks - def create_room(self): - result = yield self.hs.get_handlers().room_creation_handler.create_room( - synapse.types.create_requester(self.user), {} - ) - defer.returnValue(result["room_id"]) - - @defer.inlineCallbacks - def get(self, **params): - request = NonCallableMock(spec_set=[ - "write", "finish", "setResponseCode", "setHeader", "args", - "method", "processing" - ]) - - request.method = "GET" - request.args = {k: [v] for k, v in params.items()} - - @contextlib.contextmanager - def processing(): - yield - request.processing = processing - - yield self.resource._async_render_GET(request) - self.assertTrue(request.finish.called) - - if request.setResponseCode.called: - response_code = request.setResponseCode.call_args[0][0] - else: - response_code = 200 - - response_json = "".join( - call[0][0] for call in request.write.call_args_list - ) - response_body = json.loads(response_json) - - if response_code == 200: - self.check_response(response_body) - - defer.returnValue((response_code, response_body)) - - def check_response(self, response_body): - for name, stream in response_body.items(): - self.assertIn("field_names", stream) - field_names = stream["field_names"] - self.assertIn("rows", stream) - for row in stream["rows"]: - self.assertEquals( - len(row), len(field_names), - "%s: len(row = %r) == len(field_names = %r)" % ( - name, row, field_names - ) - ) -- cgit 1.4.1 From 3e5a62ecd8839fbfb56aa33b92127822a053ef6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 11:36:11 +0100 Subject: Add more granular event send metrics --- synapse/events/snapshot.py | 3 +++ synapse/handlers/message.py | 10 ++++++++-- synapse/handlers/room_member.py | 1 + synapse/rest/client/v1/room.py | 1 + synapse/storage/events.py | 16 ++++++++++++++++ tests/storage/event_injector.py | 4 ++-- tests/storage/test_events.py | 2 +- 7 files changed, 32 insertions(+), 5 deletions(-) (limited to 'tests') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 6be18880b9..e9a732ff03 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -50,6 +50,7 @@ class EventContext(object): "prev_group", "delta_ids", "prev_state_events", + "app_service", ] def __init__(self): @@ -68,3 +69,5 @@ class EventContext(object): self.delta_ids = None self.prev_state_events = None + + self.app_service = None diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 82a2ade1f6..57265c6d7d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,7 +175,8 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): + def create_event(self, requester, event_dict, token_id=None, txn_id=None, + prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -185,6 +186,7 @@ class MessageHandler(BaseHandler): Adds display names to Join membership events. Args: + requester event_dict (dict): An entire event token_id (str) txn_id (str) @@ -226,6 +228,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + requester=requester, prev_event_ids=prev_event_ids, ) @@ -319,6 +322,7 @@ class MessageHandler(BaseHandler): See self.create_event and self.send_nonmember_event. """ event, context = yield self.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id @@ -416,7 +420,7 @@ class MessageHandler(BaseHandler): @measure_func("_create_new_client_event") @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): + def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) @@ -456,6 +460,8 @@ class MessageHandler(BaseHandler): state_handler = self.state_handler context = yield state_handler.compute_event_context(builder) + if requester: + context.app_service = requester.app_service if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 28b2c80a93..ab87632d99 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler): content["kind"] = "guest" event, context = yield msg_handler.create_event( + requester, { "type": EventTypes.Member, "content": content, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c376ab8fd7..cd388770c8 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -164,6 +164,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): else: msg_handler = self.handlers.message_handler event, context = yield msg_handler.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3790419dd..98707d40ee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.state import resolve_events from synapse.util.caches.descriptors import cached +from synapse.types import get_domain_from_id from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict @@ -49,6 +50,9 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) persist_event_counter = metrics.register_counter("persisted_events") +event_counter = metrics.register_counter( + "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] +) def encode_json(json_object): @@ -370,6 +374,18 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) + + event_counter.inc(event.type, origin_type, origin_entity) @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): diff --git a/tests/storage/event_injector.py b/tests/storage/event_injector.py index 38556da9a7..024ac15069 100644 --- a/tests/storage/event_injector.py +++ b/tests/storage/event_injector.py @@ -27,10 +27,10 @@ class EventInjector: self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def create_room(self, room): + def create_room(self, room, user): builder = self.event_builder_factory.new({ "type": EventTypes.Create, - "sender": "", + "sender": user.to_string(), "room_id": room.to_string(), "content": {}, }) diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 3762b38e37..14443b53bc 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -50,7 +50,7 @@ class EventsStoreTestCase(unittest.TestCase): # Create something to report room = RoomID.from_string("!abc123:test") user = UserID.from_string("@raccoonlover:test") - yield self.event_injector.create_room(room) + yield self.event_injector.create_room(room, user) self.base_event = yield self._get_last_stream_token() -- cgit 1.4.1 From a7e9d8762ddbcea0fcb7ab87c2c4f4e0d91e639a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 May 2017 18:26:54 +0100 Subject: Allow clients to upload one-time-keys with new sigs When a client retries a key upload, don't give an error if the signature has changed (but the key is the same). Fixes https://github.com/vector-im/riot-android/issues/1208, hopefully. --- synapse/handlers/e2e_keys.py | 70 ++++++++++++++++++++++----- synapse/storage/end_to_end_keys.py | 47 ++++++++++-------- tests/handlers/test_e2e_keys.py | 98 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 33 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index c2b38d72a9..9d994a8f71 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -288,19 +288,8 @@ class E2eKeysHandler(object): one_time_keys = keys.get("one_time_keys", None) if one_time_keys: - logger.info( - "Adding %d one_time_keys for device %r for user %r at %d", - len(one_time_keys), device_id, user_id, time_now - ) - key_list = [] - for key_id, key_json in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append(( - algorithm, key_id, encode_canonical_json(key_json) - )) - - yield self.store.add_e2e_one_time_keys( - user_id, device_id, time_now, key_list + yield self._upload_one_time_keys_for_user( + user_id, device_id, time_now, one_time_keys, ) # the device should have been registered already, but it may have been @@ -313,3 +302,58 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue({"one_time_key_counts": result}) + + @defer.inlineCallbacks + def _upload_one_time_keys_for_user(self, user_id, device_id, time_now, + one_time_keys): + logger.info( + "Adding one_time_keys %r for device %r for user %r at %d", + one_time_keys.keys(), device_id, user_id, time_now, + ) + + # make a list of (alg, id, key) tuples + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, key_obj + )) + + # First we check if we have already persisted any of the keys. + existing_key_map = yield self.store.get_e2e_one_time_keys( + user_id, device_id, [k_id for _, k_id, _ in key_list] + ) + + new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. + for algorithm, key_id, key in key_list: + ex_json = existing_key_map.get((algorithm, key_id), None) + if ex_json: + if not _one_time_keys_match(ex_json, key): + raise SynapseError( + 400, + ("One time key %s:%s already exists. " + "Old key: %s; new key: %r") % + (algorithm, key_id, ex_json, key) + ) + else: + new_keys.append((algorithm, key_id, encode_canonical_json(key))) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, new_keys + ) + + +def _one_time_keys_match(old_key_json, new_key): + old_key = json.loads(old_key_json) + + # if either is a string rather than an object, they must match exactly + if not isinstance(old_key, dict) or not isinstance(new_key, dict): + return old_key == new_key + + # otherwise, we strip off the 'signatures' if any, because it's legitimate + # for different upload attempts to have different signatures. + old_key.pop("signatures", None) + new_key_copy = dict(new_key) + new_key_copy.pop("signatures", None) + + return old_key == new_key_copy diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index c96dae352d..e00f31da2b 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -14,7 +14,6 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.errors import SynapseError from synapse.util.caches.descriptors import cached from canonicaljson import encode_canonical_json @@ -124,18 +123,24 @@ class EndToEndKeyStore(SQLBaseStore): return result @defer.inlineCallbacks - def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): - """Insert some new one time keys for a device. + def get_e2e_one_time_keys(self, user_id, device_id, key_ids): + """Retrieve a number of one-time keys for a user - Checks if any of the keys are already inserted, if they are then check - if they match. If they don't then we raise an error. + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + key_ids(list[str]): list of key ids (excluding algorithm) to + retrieve + + Returns: + deferred resolving to Dict[(str, str), str]: map from (algorithm, + key_id) to json string for key """ - # First we check if we have already persisted any of the keys. rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", - iterable=[key_id for _, key_id, _ in key_list], + iterable=key_ids, retcols=("algorithm", "key_id", "key_json",), keyvalues={ "user_id": user_id, @@ -144,20 +149,22 @@ class EndToEndKeyStore(SQLBaseStore): desc="add_e2e_one_time_keys_check", ) - existing_key_map = { + defer.returnValue({ (row["algorithm"], row["key_id"]): row["key_json"] for row in rows - } - - new_keys = [] # Keys that we need to insert - for algorithm, key_id, json_bytes in key_list: - ex_bytes = existing_key_map.get((algorithm, key_id), None) - if ex_bytes: - if json_bytes != ex_bytes: - raise SynapseError( - 400, "One time key with key_id %r already exists" % (key_id,) - ) - else: - new_keys.append((algorithm, key_id, json_bytes)) + }) + + @defer.inlineCallbacks + def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): + """Insert some new one time keys for a device. Errors if any of the + keys already exist. + + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + time_now(long): insertion time to record (ms since epoch) + new_keys(iterable[(str, str, str)]: keys to add - each a tuple of + (algorithm, key_id, key json) + """ def _add_e2e_one_time_keys(txn): # We are protected from race between lookup and insertion due to diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 878a54dc34..f10a80a8e1 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -14,6 +14,7 @@ # limitations under the License. import mock +from synapse.api import errors from twisted.internet import defer import synapse.api.errors @@ -44,3 +45,100 @@ class E2eKeysHandlerTestCase(unittest.TestCase): local_user = "@boris:" + self.hs.hostname res = yield self.handler.query_local_devices({local_user: None}) self.assertDictEqual(res, {local_user: {}}) + + @defer.inlineCallbacks + def test_reupload_one_time_keys(self): + """we should be able to re-upload the same keys""" + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + # we should be able to change the signature without a problem + keys["alg2:k2"]["signatures"]["k1"] = "sig2" + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + @defer.inlineCallbacks + def test_change_one_time_keys(self): + """attempts to change one-time-keys should be rejected""" + + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}}, + ) + self.fail("No error when changing string key") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}, + ) + self.fail("No error when replacing dict key with string") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": {"alg1:k1": {"key": "key"}} + }, + ) + self.fail("No error when replacing string key with dict") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": { + "alg2:k2": { + "key": "key3", + "signatures": {"k1": "sig1"}, + } + }, + }, + ) + self.fail("No error when replacing dict key") + except errors.SynapseError: + pass -- cgit 1.4.1 From de042b3b885aba6b1508ca50e033fb7a95893553 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 May 2017 19:01:39 +0100 Subject: Do some logging when one-time-keys get claimed might help us figure out if https://github.com/vector-im/riot-web/issues/3868 has happened. --- synapse/federation/federation_server.py | 10 ++++++++++ synapse/handlers/e2e_keys.py | 10 ++++++++++ tests/handlers/test_e2e_keys.py | 34 +++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+) (limited to 'tests') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bc20b9c201..51e3fdea06 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -440,6 +440,16 @@ class FederationServer(FederationBase): key_id: json.loads(json_bytes) } + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({"one_time_keys": json_result}) @defer.inlineCallbacks diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 9d994a8f71..73921a5307 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -262,6 +262,16 @@ class E2eKeysHandler(object): for destination in remote_queries ])) + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({ "one_time_keys": json_result, "failures": failures diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index f10a80a8e1..19f5ed6bce 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -142,3 +142,37 @@ class E2eKeysHandlerTestCase(unittest.TestCase): self.fail("No error when replacing dict key") except errors.SynapseError: pass + + @unittest.DEBUG + @defer.inlineCallbacks + def test_claim_one_time_key(self): + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1} + }) + + res2 = yield self.handler.claim_one_time_keys({ + "one_time_keys": { + local_user: { + device_id: "alg1" + } + } + }, timeout=None) + self.assertEqual(res2, { + "failures": {}, + "one_time_keys": { + local_user: { + device_id: { + "alg1:k1": "key1" + } + } + } + }) -- cgit 1.4.1