diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index cc6512ccc7..85a970a6c9 100644
--- a/tests/handlers/test_device.py
+++ b/tests/handlers/test_device.py
@@ -15,22 +15,30 @@
from twisted.internet import defer
-from synapse.handlers.device import DeviceHandler
-from tests import unittest
-from tests.utils import setup_test_homeserver
+import synapse.api.errors
+import synapse.handlers.device
+import synapse.storage
+from synapse import types
+from tests import unittest, utils
-class DeviceHandlers(object):
- def __init__(self, hs):
- self.device_handler = DeviceHandler(hs)
+user1 = "@boris:aaa"
+user2 = "@theresa:bbb"
class DeviceTestCase(unittest.TestCase):
+ def __init__(self, *args, **kwargs):
+ super(DeviceTestCase, self).__init__(*args, **kwargs)
+ self.store = None # type: synapse.storage.DataStore
+ self.handler = None # type: synapse.handlers.device.DeviceHandler
+ self.clock = None # type: utils.MockClock
+
@defer.inlineCallbacks
def setUp(self):
- self.hs = yield setup_test_homeserver(handlers=None)
- self.hs.handlers = handlers = DeviceHandlers(self.hs)
- self.handler = handlers.device_handler
+ hs = yield utils.setup_test_homeserver(handlers=None)
+ self.handler = synapse.handlers.device.DeviceHandler(hs)
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
@defer.inlineCallbacks
def test_device_is_created_if_doesnt_exist(self):
@@ -73,3 +81,104 @@ class DeviceTestCase(unittest.TestCase):
dev = yield self.handler.store.get_device("theresa", device_id)
self.assertEqual(dev["display_name"], "display")
+
+ @defer.inlineCallbacks
+ def test_get_devices_by_user(self):
+ yield self._record_users()
+
+ res = yield self.handler.get_devices_by_user(user1)
+ self.assertEqual(3, len(res))
+ device_map = {
+ d["device_id"]: d for d in res
+ }
+ self.assertDictContainsSubset({
+ "user_id": user1,
+ "device_id": "xyz",
+ "display_name": "display 0",
+ "last_seen_ip": None,
+ "last_seen_ts": None,
+ }, device_map["xyz"])
+ self.assertDictContainsSubset({
+ "user_id": user1,
+ "device_id": "fco",
+ "display_name": "display 1",
+ "last_seen_ip": "ip1",
+ "last_seen_ts": 1000000,
+ }, device_map["fco"])
+ self.assertDictContainsSubset({
+ "user_id": user1,
+ "device_id": "abc",
+ "display_name": "display 2",
+ "last_seen_ip": "ip3",
+ "last_seen_ts": 3000000,
+ }, device_map["abc"])
+
+ @defer.inlineCallbacks
+ def test_get_device(self):
+ yield self._record_users()
+
+ res = yield self.handler.get_device(user1, "abc")
+ self.assertDictContainsSubset({
+ "user_id": user1,
+ "device_id": "abc",
+ "display_name": "display 2",
+ "last_seen_ip": "ip3",
+ "last_seen_ts": 3000000,
+ }, res)
+
+ @defer.inlineCallbacks
+ def test_delete_device(self):
+ yield self._record_users()
+
+ # delete the device
+ yield self.handler.delete_device(user1, "abc")
+
+ # check the device was deleted
+ with self.assertRaises(synapse.api.errors.NotFoundError):
+ yield self.handler.get_device(user1, "abc")
+
+ # we'd like to check the access token was invalidated, but that's a
+ # bit of a PITA.
+
+ @defer.inlineCallbacks
+ def test_update_device(self):
+ yield self._record_users()
+
+ update = {"display_name": "new display"}
+ yield self.handler.update_device(user1, "abc", update)
+
+ res = yield self.handler.get_device(user1, "abc")
+ self.assertEqual(res["display_name"], "new display")
+
+ @defer.inlineCallbacks
+ def test_update_unknown_device(self):
+ update = {"display_name": "new_display"}
+ with self.assertRaises(synapse.api.errors.NotFoundError):
+ yield self.handler.update_device("user_id", "unknown_device_id",
+ update)
+
+ @defer.inlineCallbacks
+ def _record_users(self):
+ # check this works for both devices which have a recorded client_ip,
+ # and those which don't.
+ yield self._record_user(user1, "xyz", "display 0")
+ yield self._record_user(user1, "fco", "display 1", "token1", "ip1")
+ yield self._record_user(user1, "abc", "display 2", "token2", "ip2")
+ yield self._record_user(user1, "abc", "display 2", "token3", "ip3")
+
+ yield self._record_user(user2, "def", "dispkay", "token4", "ip4")
+
+ @defer.inlineCallbacks
+ def _record_user(self, user_id, device_id, display_name,
+ access_token=None, ip=None):
+ device_id = yield self.handler.check_device_registered(
+ user_id=user_id,
+ device_id=device_id,
+ initial_device_display_name=display_name
+ )
+
+ if ip is not None:
+ yield self.store.insert_client_ip(
+ types.UserID.from_string(user_id),
+ access_token, ip, "user_agent", device_id)
+ self.clock.advance_time(1000)
|