diff options
Diffstat (limited to 'tests/replication')
-rw-r--r-- | tests/replication/slave/__init__.py | 14 | ||||
-rw-r--r-- | tests/replication/slave/storage/__init__.py | 14 | ||||
-rw-r--r-- | tests/replication/slave/storage/_base.py | 55 | ||||
-rw-r--r-- | tests/replication/slave/storage/test_account_data.py | 56 | ||||
-rw-r--r-- | tests/replication/slave/storage/test_events.py | 365 | ||||
-rw-r--r-- | tests/replication/slave/storage/test_receipts.py | 39 | ||||
-rw-r--r-- | tests/replication/test_resource.py | 34 |
7 files changed, 572 insertions, 5 deletions
diff --git a/tests/replication/slave/__init__.py b/tests/replication/slave/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/tests/replication/slave/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/replication/slave/storage/__init__.py b/tests/replication/slave/storage/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/tests/replication/slave/storage/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py new file mode 100644 index 0000000000..1f13cd0bc0 --- /dev/null +++ b/tests/replication/slave/storage/_base.py @@ -0,0 +1,55 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from tests import unittest + +from mock import Mock, NonCallableMock +from tests.utils import setup_test_homeserver +from synapse.replication.resource import ReplicationResource + + +class BaseSlavedStoreTestCase(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.hs = yield setup_test_homeserver( + "blue", + http_client=None, + replication_layer=Mock(), + ratelimiter=NonCallableMock(spec_set=[ + "send_message", + ]), + ) + self.hs.get_ratelimiter().send_message.return_value = (True, 0) + + self.replication = ReplicationResource(self.hs) + + self.master_store = self.hs.get_datastore() + self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) + self.event_id = 0 + + @defer.inlineCallbacks + def replicate(self): + streams = self.slaved_store.stream_positions() + result = yield self.replication.replicate(streams, 100) + yield self.slaved_store.process_replication(result) + + @defer.inlineCallbacks + def check(self, method, args, expected_result=None): + master_result = yield getattr(self.master_store, method)(*args) + slaved_result = yield getattr(self.slaved_store, method)(*args) + if expected_result is not None: + self.assertEqual(master_result, expected_result) + self.assertEqual(slaved_result, expected_result) + self.assertEqual(master_result, slaved_result) diff --git a/tests/replication/slave/storage/test_account_data.py b/tests/replication/slave/storage/test_account_data.py new file mode 100644 index 0000000000..da54d478ce --- /dev/null +++ b/tests/replication/slave/storage/test_account_data.py @@ -0,0 +1,56 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +TYPE = "my.type" + + +class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedAccountDataStore + + @defer.inlineCallbacks + def test_user_account_data(self): + yield self.master_store.add_account_data_for_user( + USER_ID, TYPE, {"a": 1} + ) + yield self.replicate() + yield self.check( + "get_global_account_data_by_type_for_user", + [TYPE, USER_ID], {"a": 1} + ) + yield self.check( + "get_global_account_data_by_type_for_users", + [TYPE, [USER_ID]], {USER_ID: {"a": 1}} + ) + + yield self.master_store.add_account_data_for_user( + USER_ID, TYPE, {"a": 2} + ) + yield self.replicate() + yield self.check( + "get_global_account_data_by_type_for_user", + [TYPE, USER_ID], {"a": 2} + ) + yield self.check( + "get_global_account_data_by_type_for_users", + [TYPE, [USER_ID]], {USER_ID: {"a": 2}} + ) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py new file mode 100644 index 0000000000..17587fda00 --- /dev/null +++ b/tests/replication/slave/storage/test_events.py @@ -0,0 +1,365 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStoreTestCase + +from synapse.events import FrozenEvent, _EventInternalMetadata +from synapse.events.snapshot import EventContext +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.storage.roommember import RoomsForUser + +from twisted.internet import defer + + +USER_ID = "@feeling:blue" +USER_ID_2 = "@bright:blue" +OUTLIER = {"outlier": True} +ROOM_ID = "!room:blue" + + +def dict_equals(self, other): + return self.__dict__ == other.__dict__ + + +def patch__eq__(cls): + eq = getattr(cls, "__eq__", None) + cls.__eq__ = dict_equals + + def unpatch(): + if eq is not None: + cls.__eq__ = eq + return unpatch + + +class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedEventStore + + def setUp(self): + # Patch up the equality operator for events so that we can check + # whether lists of events match using assertEquals + self.unpatches = [ + patch__eq__(_EventInternalMetadata), + patch__eq__(FrozenEvent), + ] + return super(SlavedEventStoreTestCase, self).setUp() + + def tearDown(self): + [unpatch() for unpatch in self.unpatches] + + @defer.inlineCallbacks + def test_room_name_and_aliases(self): + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.persist(type="m.room.member", key=USER_ID, membership="join") + yield self.persist(type="m.room.name", key="", name="name1") + yield self.persist( + type="m.room.aliases", key="blue", aliases=["#1:blue"] + ) + yield self.replicate() + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), ("name1", ["#1:blue"]) + ) + + # Set the room name. + yield self.persist(type="m.room.name", key="", name="name2") + yield self.replicate() + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), ("name2", ["#1:blue"]) + ) + + # Set the room aliases. + yield self.persist( + type="m.room.aliases", key="blue", aliases=["#2:blue"] + ) + yield self.replicate() + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), ("name2", ["#2:blue"]) + ) + + # Leave and join the room clobbering the state. + yield self.persist(type="m.room.member", key=USER_ID, membership="leave") + yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + reset_state=[create] + ) + yield self.replicate() + + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), (None, []) + ) + + @defer.inlineCallbacks + def test_room_members(self): + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID,), []) + yield self.check("get_users_in_room", (ROOM_ID,), []) + + # Join the room. + join = yield self.persist(type="m.room.member", key=USER_ID, membership="join") + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID,), [RoomsForUser( + room_id=ROOM_ID, + sender=USER_ID, + membership="join", + event_id=join.event_id, + stream_ordering=join.internal_metadata.stream_ordering, + )]) + yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID]) + + # Leave the room. + yield self.persist(type="m.room.member", key=USER_ID, membership="leave") + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID,), []) + yield self.check("get_users_in_room", (ROOM_ID,), []) + + # Add some other user to the room. + join = yield self.persist(type="m.room.member", key=USER_ID_2, membership="join") + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID_2,), [RoomsForUser( + room_id=ROOM_ID, + sender=USER_ID, + membership="join", + event_id=join.event_id, + stream_ordering=join.internal_metadata.stream_ordering, + )]) + yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2]) + + # Join the room clobbering the state. + # This should remove any evidence of the other user being in the room. + yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + reset_state=[create] + ) + yield self.replicate() + yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID]) + yield self.check("get_rooms_for_user", (USER_ID_2,), []) + + @defer.inlineCallbacks + def test_get_latest_event_ids_in_room(self): + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.replicate() + yield self.check( + "get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id] + ) + + join = yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + prev_events=[(create.event_id, {})], + ) + yield self.replicate() + yield self.check( + "get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id] + ) + + @defer.inlineCallbacks + def test_get_current_state(self): + # Create the room. + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID), [] + ) + + # Join the room. + join1 = yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + ) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID), + [join1] + ) + + # Add some other user to the room. + join2 = yield self.persist( + type="m.room.member", key=USER_ID_2, membership="join", + ) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID_2), + [join2] + ) + + # Leave the room, then rejoin the room clobbering state. + yield self.persist(type="m.room.member", key=USER_ID, membership="leave") + join3 = yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + reset_state=[create] + ) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID_2), + [] + ) + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID), + [join3] + ) + + @defer.inlineCallbacks + def test_redactions(self): + yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.persist(type="m.room.member", key=USER_ID, membership="join") + + msg = yield self.persist( + type="m.room.message", msgtype="m.text", body="Hello" + ) + yield self.replicate() + yield self.check("get_event", [msg.event_id], msg) + + redaction = yield self.persist( + type="m.room.redaction", redacts=msg.event_id + ) + yield self.replicate() + + msg_dict = msg.get_dict() + msg_dict["content"] = {} + msg_dict["unsigned"]["redacted_by"] = redaction.event_id + msg_dict["unsigned"]["redacted_because"] = redaction + redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + yield self.check("get_event", [msg.event_id], redacted) + + @defer.inlineCallbacks + def test_backfilled_redactions(self): + yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.persist(type="m.room.member", key=USER_ID, membership="join") + + msg = yield self.persist( + type="m.room.message", msgtype="m.text", body="Hello" + ) + yield self.replicate() + yield self.check("get_event", [msg.event_id], msg) + + redaction = yield self.persist( + type="m.room.redaction", redacts=msg.event_id, backfill=True + ) + yield self.replicate() + + msg_dict = msg.get_dict() + msg_dict["content"] = {} + msg_dict["unsigned"]["redacted_by"] = redaction.event_id + msg_dict["unsigned"]["redacted_because"] = redaction + redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + yield self.check("get_event", [msg.event_id], redacted) + + @defer.inlineCallbacks + def test_invites(self): + yield self.check("get_invited_rooms_for_user", [USER_ID_2], []) + event = yield self.persist( + type="m.room.member", key=USER_ID_2, membership="invite" + ) + yield self.replicate() + yield self.check("get_invited_rooms_for_user", [USER_ID_2], [RoomsForUser( + ROOM_ID, USER_ID, "invite", event.event_id, + event.internal_metadata.stream_ordering + )]) + + @defer.inlineCallbacks + def test_push_actions_for_user(self): + yield self.persist(type="m.room.create", creator=USER_ID) + yield self.persist(type="m.room.join", key=USER_ID, membership="join") + yield self.persist( + type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join" + ) + event1 = yield self.persist( + type="m.room.message", msgtype="m.text", body="hello" + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 0} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, ["notify"])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 1} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, [ + "notify", {"set_tweak": "highlight", "value": True} + ])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 1, "notify_count": 2} + ) + + event_id = 0 + + @defer.inlineCallbacks + def persist( + self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, internal={}, + state=None, reset_state=False, backfill=False, + depth=None, prev_events=[], auth_events=[], prev_state=[], redacts=None, + push_actions=[], + **content + ): + """ + Returns: + synapse.events.FrozenEvent: The event that was persisted. + """ + if depth is None: + depth = self.event_id + + event_dict = { + "sender": sender, + "type": type, + "content": content, + "event_id": "$%d:blue" % (self.event_id,), + "room_id": room_id, + "depth": depth, + "origin_server_ts": self.event_id, + "prev_events": prev_events, + "auth_events": auth_events, + } + if key is not None: + event_dict["state_key"] = key + event_dict["prev_state"] = prev_state + + if redacts is not None: + event_dict["redacts"] = redacts + + event = FrozenEvent(event_dict, internal_metadata_dict=internal) + + self.event_id += 1 + + context = EventContext(current_state=state) + context.push_actions = push_actions + + ordering = None + if backfill: + yield self.master_store.persist_events( + [(event, context)], backfilled=True + ) + else: + ordering, _ = yield self.master_store.persist_event( + event, context, current_state=reset_state + ) + + if ordering: + event.internal_metadata.stream_ordering = ordering + + defer.returnValue(event) diff --git a/tests/replication/slave/storage/test_receipts.py b/tests/replication/slave/storage/test_receipts.py new file mode 100644 index 0000000000..6624fe4eea --- /dev/null +++ b/tests/replication/slave/storage/test_receipts.py @@ -0,0 +1,39 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +ROOM_ID = "!room:blue" +EVENT_ID = "$event:blue" + + +class SlavedReceiptTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedReceiptsStore + + @defer.inlineCallbacks + def test_receipt(self): + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], {}) + yield self.master_store.insert_receipt( + ROOM_ID, "m.read", USER_ID, [EVENT_ID], {} + ) + yield self.replicate() + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], { + ROOM_ID: EVENT_ID + }) diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index f4b5fb3328..842e3d29d7 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -58,21 +58,27 @@ class ReplicationResourceCase(unittest.TestCase): self.assertEquals(body, {}) @defer.inlineCallbacks - def test_events(self): - get = self.get(events="-1", timeout="0") + def test_events_and_state(self): + get = self.get(events="-1", state="-1", timeout="0") yield self.hs.get_handlers().room_creation_handler.create_room( Requester(self.user, "", False), {} ) code, body = yield get self.assertEquals(code, 200) self.assertEquals(body["events"]["field_names"], [ - "position", "internal", "json" + "position", "internal", "json", "state_group" + ]) + self.assertEquals(body["state_groups"]["field_names"], [ + "position", "room_id", "event_id" + ]) + self.assertEquals(body["state_group_state"]["field_names"], [ + "position", "type", "state_key", "event_id" ]) @defer.inlineCallbacks def test_presence(self): get = self.get(presence="-1") - yield self.hs.get_handlers().presence_handler.set_state( + yield self.hs.get_presence_handler().set_state( self.user, {"presence": "online"} ) code, body = yield get @@ -87,7 +93,7 @@ class ReplicationResourceCase(unittest.TestCase): def test_typing(self): room_id = yield self.create_room() get = self.get(typing="-1") - yield self.hs.get_handlers().typing_notification_handler.started_typing( + yield self.hs.get_typing_handler().started_typing( self.user, self.user, room_id, timeout=2 ) code, body = yield get @@ -132,6 +138,7 @@ class ReplicationResourceCase(unittest.TestCase): test_timeout_backfill = _test_timeout("backfill") test_timeout_push_rules = _test_timeout("push_rules") test_timeout_pushers = _test_timeout("pushers") + test_timeout_state = _test_timeout("state") @defer.inlineCallbacks def send_text_message(self, room_id, message): @@ -182,4 +189,21 @@ class ReplicationResourceCase(unittest.TestCase): ) response_body = json.loads(response_json) + if response_code == 200: + self.check_response(response_body) + defer.returnValue((response_code, response_body)) + + def check_response(self, response_body): + for name, stream in response_body.items(): + self.assertIn("field_names", stream) + field_names = stream["field_names"] + self.assertIn("rows", stream) + self.assertTrue(stream["rows"]) + for row in stream["rows"]: + self.assertEquals( + len(row), len(field_names), + "%s: len(row = %r) == len(field_names = %r)" % ( + name, row, field_names + ) + ) |