summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-03-01 14:49:41 +0000
committerMark Haines <mark.haines@matrix.org>2016-03-01 14:49:41 +0000
commit60a0f81c7a2da86bf959227a440e3f7a2b727bb5 (patch)
tree68a0f9fe80d57f529d2aa2c9f355e935389fd0d6 /tests
parentMerge pull request #611 from matrix-org/erikj/expiring_cache_size (diff)
downloadsynapse-60a0f81c7a2da86bf959227a440e3f7a2b727bb5.tar.xz
Add a /replication API for extracting the updates that happened on
synapse

This is necessary for replicating the data in synapse to be visible to a
separate service because presence and typing notifications aren't stored
in a database so won't be visible to another process.

This API can be used to either get the raw data by requesting the tables
themselves or to just receive notifications for updates by following the
streams meta-stream.

Returns updates for each table requested a JSON array of arrays with a
row for each row in the table.

Each table is prefixed by a header row with the: name of the table,
current stream_id position for the table, number of rows, number of
columns and the names of the columns.
This is followed by the rows that have been added to the server since
the requester last asked.

The API has a timeout and is hooked up to the notifier so that a slave
can long poll for updates.
Diffstat (limited to 'tests')
-rw-r--r--tests/replication/__init__.py14
-rw-r--r--tests/replication/test_resource.py179
-rw-r--r--tests/utils.py5
3 files changed, 196 insertions, 2 deletions
diff --git a/tests/replication/__init__.py b/tests/replication/__init__.py
new file mode 100644
index 0000000000..b7df13c9ee
--- /dev/null
+++ b/tests/replication/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py
new file mode 100644
index 0000000000..38daaf87e2
--- /dev/null
+++ b/tests/replication/test_resource.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.resource import ReplicationResource
+from synapse.types import Requester, UserID
+
+from twisted.internet import defer
+from tests import unittest
+from tests.utils import setup_test_homeserver
+from mock import Mock, NonCallableMock
+import json
+import contextlib
+
+
+class ReplicationResourceCase(unittest.TestCase):
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.hs = yield setup_test_homeserver(
+            "red",
+            http_client=None,
+            replication_layer=Mock(),
+            ratelimiter=NonCallableMock(spec_set=[
+                "send_message",
+            ]),
+        )
+        self.user = UserID.from_string("@seeing:red")
+
+        self.hs.get_ratelimiter().send_message.return_value = (True, 0)
+
+        self.resource = ReplicationResource(self.hs)
+
+    @defer.inlineCallbacks
+    def test_streams(self):
+        # Passing "-1" returns the current stream positions
+        code, body = yield self.get(streams="-1")
+        self.assertEquals(code, 200)
+        self.assertEquals(body["streams"]["field_names"], ["name", "position"])
+        position = body["streams"]["position"]
+        # Passing the current position returns an empty response after the
+        # timeout
+        get = self.get(streams=str(position), timeout="0")
+        self.hs.clock.advance_time_msec(1)
+        code, body = yield get
+        self.assertEquals(code, 200)
+        self.assertEquals(body, {})
+
+    @defer.inlineCallbacks
+    def test_events(self):
+        get = self.get(events="-1", timeout="0")
+        yield self.hs.get_handlers().room_creation_handler.create_room(
+            Requester(self.user, "", False), {}
+        )
+        code, body = yield get
+        self.assertEquals(code, 200)
+        self.assertEquals(body["events"]["field_names"], [
+            "position", "internal", "json"
+        ])
+
+    @defer.inlineCallbacks
+    def test_presence(self):
+        get = self.get(presence="-1")
+        yield self.hs.get_handlers().presence_handler.set_state(
+            self.user, {"presence": "online"}
+        )
+        code, body = yield get
+        self.assertEquals(code, 200)
+        self.assertEquals(body["presence"]["field_names"], [
+            "position", "user_id", "state", "last_active_ts",
+            "last_federation_update_ts", "last_user_sync_ts",
+            "status_msg", "currently_active",
+        ])
+
+    @defer.inlineCallbacks
+    def test_typing(self):
+        room_id = yield self.create_room()
+        get = self.get(typing="-1")
+        yield self.hs.get_handlers().typing_notification_handler.started_typing(
+            self.user, self.user, room_id, timeout=2
+        )
+        code, body = yield get
+        self.assertEquals(code, 200)
+        self.assertEquals(body["typing"]["field_names"], [
+            "position", "room_id", "typing"
+        ])
+
+    @defer.inlineCallbacks
+    def test_receipts(self):
+        room_id = yield self.create_room()
+        event_id = yield self.send_text_message(room_id, "Hello, World")
+        get = self.get(receipts="-1")
+        yield self.hs.get_handlers().receipts_handler.received_client_receipt(
+            room_id, "m.read", self.user.to_string(), event_id
+        )
+        code, body = yield get
+        self.assertEquals(code, 200)
+        self.assertEquals(body["receipts"]["field_names"], [
+            "position", "room_id", "receipt_type", "user_id", "event_id", "data"
+        ])
+
+    def _test_timeout(stream):
+        """Check that a request for the given stream timesout"""
+        @defer.inlineCallbacks
+        def test_timeout(self):
+            get = self.get(**{stream: "-1", "timeout": "0"})
+            self.hs.clock.advance_time_msec(1)
+            code, body = yield get
+            self.assertEquals(code, 200)
+            self.assertEquals(body, {})
+        test_timeout.__name__ = "test_timeout_%s" % (stream)
+        return test_timeout
+
+    test_timeout_events = _test_timeout("events")
+    test_timeout_presence = _test_timeout("presence")
+    test_timeout_typing = _test_timeout("typing")
+    test_timeout_receipts = _test_timeout("receipts")
+    test_timeout_user_account_data = _test_timeout("user_account_data")
+    test_timeout_room_account_data = _test_timeout("room_account_data")
+    test_timeout_tag_account_data = _test_timeout("tag_account_data")
+    test_timeout_backfill = _test_timeout("backfill")
+
+    @defer.inlineCallbacks
+    def send_text_message(self, room_id, message):
+        handler = self.hs.get_handlers().message_handler
+        event = yield handler.create_and_send_nonmember_event({
+            "type": "m.room.message",
+            "content": {"body": "message", "msgtype": "m.text"},
+            "room_id": room_id,
+            "sender": self.user.to_string(),
+        })
+        defer.returnValue(event.event_id)
+
+    @defer.inlineCallbacks
+    def create_room(self):
+        result = yield self.hs.get_handlers().room_creation_handler.create_room(
+            Requester(self.user, "", False), {}
+        )
+        defer.returnValue(result["room_id"])
+
+    @defer.inlineCallbacks
+    def get(self, **params):
+        request = NonCallableMock(spec_set=[
+            "write", "finish", "setResponseCode", "setHeader", "args",
+            "method", "processing"
+        ])
+
+        request.method = "GET"
+        request.args = {k: [v] for k, v in params.items()}
+
+        @contextlib.contextmanager
+        def processing():
+            yield
+        request.processing = processing
+
+        yield self.resource._async_render_GET(request)
+        self.assertTrue(request.finish.called)
+
+        if request.setResponseCode.called:
+            response_code = request.setResponseCode.call_args[0][0]
+        else:
+            response_code = 200
+
+        response_json = "".join(
+            call[0][0] for call in request.write.call_args_list
+        )
+        response_body = json.loads(response_json)
+
+        defer.returnValue((response_code, response_body))
diff --git a/tests/utils.py b/tests/utils.py
index bf7a31ff9e..dfbee5c23a 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -239,9 +239,10 @@ class MockClock(object):
     def looping_call(self, function, interval):
         pass
 
-    def cancel_call_later(self, timer):
+    def cancel_call_later(self, timer, ignore_errs=False):
         if timer[2]:
-            raise Exception("Cannot cancel an expired timer")
+            if not ignore_errs:
+                raise Exception("Cannot cancel an expired timer")
 
         timer[2] = True
         self.timers = [t for t in self.timers if t != timer]