From 081e5d55e68b1a55d4f52ef062084d9126ce2231 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 11:14:54 +0100 Subject: Send the correct host header when fetching keys --- synapse/crypto/keyclient.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 54b83da9d8..4fca215c97 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -79,8 +79,7 @@ class SynapseKeyClientProtocol(HTTPClient): self.host = None def connectionMade(self): - self.host = self.transport.getHost() - logger.debug("Connected to %s", self.host) + logger.debug("Connected to %s", self.transport.getPeer()) self.sendCommand(b"GET", self.path) if self.host: self.sendHeader(b"Host", self.host) @@ -124,7 +123,10 @@ class SynapseKeyClientProtocol(HTTPClient): self.timer.cancel() def on_timeout(self): - logger.debug("Timeout waiting for response from %s", self.host) + logger.debug( + "Timeout waiting for response from %s: %s", + self.host, self.transport.getPeer(), + ) self.errback(IOError("Timeout waiting for response")) self.transport.abortConnection() @@ -133,4 +135,5 @@ class SynapseKeyClientFactory(Factory): def protocol(self): protocol = SynapseKeyClientProtocol() protocol.path = self.path + protocol.path = self.host return protocol -- cgit 1.4.1 From cf94a78872397fd97465b4704465a2d03d27d41e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 11:45:53 +0100 Subject: Set host not path --- synapse/crypto/keyclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 4fca215c97..1d85990369 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -135,5 +135,5 @@ class SynapseKeyClientFactory(Factory): def protocol(self): protocol = SynapseKeyClientProtocol() protocol.path = self.path - protocol.path = self.host + protocol.host = self.host return protocol -- cgit 1.4.1 From 55abbe1850efff95efe9935873b666e5fc4bf0e9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 Jul 2016 15:55:13 +0100 Subject: make /devices return a list Turns out I specced this to return a list of devices rather than a dict of them --- synapse/handlers/device.py | 10 +++++----- tests/handlers/test_device.py | 11 +++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 110f5fbb5c..1f9e15c33c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -79,17 +79,17 @@ class DeviceHandler(BaseHandler): Args: user_id (str): Returns: - defer.Deferred: dict[str, dict[str, X]]: map from device_id to - info on the device + defer.Deferred: list[dict[str, X]]: info on each device """ - devices = yield self.store.get_devices_by_user(user_id) + device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id) for device_id in devices.keys()) + devices=((user_id, device_id) for device_id in device_map.keys()) ) - for device in devices.values(): + devices = device_map.values() + for device in devices: _update_device_from_client_ips(device, ips) defer.returnValue(devices) diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 87c3c75aea..331aa13fed 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -84,28 +84,31 @@ class DeviceTestCase(unittest.TestCase): yield self._record_users() res = yield self.handler.get_devices_by_user(user1) - self.assertEqual(3, len(res.keys())) + self.assertEqual(3, len(res)) + device_map = { + d["device_id"]: d for d in res + } self.assertDictContainsSubset({ "user_id": user1, "device_id": "xyz", "display_name": "display 0", "last_seen_ip": None, "last_seen_ts": None, - }, res["xyz"]) + }, device_map["xyz"]) self.assertDictContainsSubset({ "user_id": user1, "device_id": "fco", "display_name": "display 1", "last_seen_ip": "ip1", "last_seen_ts": 1000000, - }, res["fco"]) + }, device_map["fco"]) self.assertDictContainsSubset({ "user_id": user1, "device_id": "abc", "display_name": "display 2", "last_seen_ip": "ip3", "last_seen_ts": 3000000, - }, res["abc"]) + }, device_map["abc"]) @defer.inlineCallbacks def test_get_device(self): -- cgit 1.4.1 From d26b660aa6580b1947f04f7efd598d34a259b970 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 17:38:51 +0100 Subject: Cache getPeer --- synapse/crypto/keyclient.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 1d85990369..c2bd64d6c2 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -77,9 +77,12 @@ class SynapseKeyClientProtocol(HTTPClient): def __init__(self): self.remote_key = defer.Deferred() self.host = None + self._peer = None def connectionMade(self): - logger.debug("Connected to %s", self.transport.getPeer()) + self._peer = self.transport.getPeer() + logger.debug("Connected to %s", self._peer) + self.sendCommand(b"GET", self.path) if self.host: self.sendHeader(b"Host", self.host) @@ -125,7 +128,7 @@ class SynapseKeyClientProtocol(HTTPClient): def on_timeout(self): logger.debug( "Timeout waiting for response from %s: %s", - self.host, self.transport.getPeer(), + self.host, self._peer, ) self.errback(IOError("Timeout waiting for response")) self.transport.abortConnection() -- cgit 1.4.1 From ec5717caf59eb72caf6f82f1643f492f328a4be5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 13:14:03 +0100 Subject: Create index on user_ips in the background user_ips is kinda big, so really we want to add the index in the background once we're running. Replace the schema delta with one which will do that. I've done this in a way that's reasonably easy to reuse as there a few other indexes I need, and I don't suppose they will be the last. --- synapse/storage/background_updates.py | 73 +++++++++++++++++++--- synapse/storage/client_ips.py | 16 +++-- synapse/storage/schema/delta/33/user_ips_index.sql | 3 +- 3 files changed, 80 insertions(+), 12 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 66a995157d..75951d0173 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from . import engines from twisted.internet import defer @@ -106,13 +107,13 @@ class BackgroundUpdateStore(SQLBaseStore): ) except: logger.exception("Error doing update") - - if result is None: - logger.info( - "No more background updates to do." - " Unscheduling background update task." - ) - return + else: + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + return @defer.inlineCallbacks def do_background_update(self, desired_duration_ms): @@ -202,6 +203,64 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_background_index_update(self, update_name, index_name, + table, columns): + """Helper for store classes to do a background index addition + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('my_new_index', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name (str): update_name to register for + index_name (str): name of index to add + table (str): table to add index to + columns (list[str]): columns/expressions to include in index + """ + + # if this is postgres, we add the indexes concurrently. Otherwise + # we fall back to doing it inline + if isinstance(self.database_engine, engines.PostgresEngine): + conc = True + else: + conc = False + + sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ + % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + + def create_index_concurrently(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + c = conn.cursor() + c.execute(sql) + conn.set_session(autocommit=False) + + def create_index(conn): + c = conn.cursor() + c.execute(sql) + + @defer.inlineCallbacks + def updater(progress, batch_size): + logger.info("Adding index %s to %s", index_name, table) + if conc: + yield self.runWithConnection(create_index_concurrently) + else: + yield self.runWithConnection(create_index) + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, updater) + def start_background_update(self, update_name, progress): """Starts a background update running. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index e31fa53c3f..20eb9ac15f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,10 +15,11 @@ import logging -from ._base import SQLBaseStore, Cache - from twisted.internet import defer +from ._base import Cache +from . import background_updates + logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller @@ -27,8 +28,7 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(SQLBaseStore): - +class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", @@ -37,6 +37,14 @@ class ClientIpStore(SQLBaseStore): super(ClientIpStore, self).__init__(hs) + self.register_background_index_update( + "user_ips_device_index", + index_name="user_ips_device_id", + table="user_ips", + columns=["user_id", "device_id", "last_seen"], + ) + + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql index 8a05677d42..473f75a78e 100644 --- a/synapse/storage/schema/delta/33/user_ips_index.sql +++ b/synapse/storage/schema/delta/33/user_ips_index.sql @@ -13,4 +13,5 @@ * limitations under the License. */ -CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen); +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_device_index', '{}'); -- cgit 1.4.1 From 363786845b728bcd7146b3d949a86021a96eb2d2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 13:21:07 +0100 Subject: PEP8 --- synapse/storage/client_ips.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 20eb9ac15f..71e5ea112f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -44,7 +44,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) - @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) -- cgit 1.4.1 From dad2da7e54a4f0e92185e4f8553fb51b037c0bd3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 22 Jul 2016 17:00:56 +0100 Subject: Log the hostname the reCAPTCHA was completed on This could be useful information to have in the logs. Also comment about how & why we don't verify the hostname. --- synapse/handlers/auth.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 8f83923ddb..6fff7e7d03 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -279,8 +279,17 @@ class AuthHandler(BaseHandler): data = pde.response resp_body = simplejson.loads(data) - if 'success' in resp_body and resp_body['success']: - defer.returnValue(True) + if 'success' in resp_body: + # Note that we do NOT check the hostname here: we explicitly + # intend the CAPTCHA to be presented by whatever client the + # user is using, we just care that they have completed a CAPTCHA. + logger.info( + "%s reCAPTCHA from hostname %s", + "Successful" if resp_body['success'] else "Failed", + resp_body['hostname'] + ) + if resp_body['success']: + defer.returnValue(True) raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) @defer.inlineCallbacks -- cgit 1.4.1 From 7ed58bb3476c4a18a9af97b8ee3358dac00098eb Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 22 Jul 2016 17:18:50 +0100 Subject: Use get to avoid KeyErrors --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 6fff7e7d03..d5d2072436 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -286,7 +286,7 @@ class AuthHandler(BaseHandler): logger.info( "%s reCAPTCHA from hostname %s", "Successful" if resp_body['success'] else "Failed", - resp_body['hostname'] + resp_body.get('hostname') ) if resp_body['success']: defer.returnValue(True) -- cgit 1.4.1 From 465117d7ca40ba9906697aa023897798f7833830 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 12:10:42 +0100 Subject: Fix background_update tests A bit of a cleanup for background_updates, and make sure that the real background updates have run before we start the unit tests, so that they don't interfere with the tests. --- synapse/storage/background_updates.py | 27 ++++++++++++++++++++------- tests/storage/test_background_update.py | 22 ++++++++++++++++------ 2 files changed, 36 insertions(+), 13 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 75951d0173..2771f7c3c1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -88,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - while True: - if self._background_update_timer is not None: - return + assert(self._background_update_timer is not None, + "background updates already running") + + logger.info("Starting background schema updates") + while True: sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None @@ -102,7 +104,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_timer = None try: - result = yield self.do_background_update( + result = yield self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except: @@ -113,11 +115,12 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) - return + defer.returnValue() @defer.inlineCallbacks - def do_background_update(self, desired_duration_ms): - """Does some amount of work on a background update + def do_next_background_update(self, desired_duration_ms): + """Does some amount of work on the next queued background update + Args: desired_duration_ms(float): How long we want to spend updating. @@ -136,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue.append(update['update_name']) if not self._background_update_queue: + # no work left to do defer.returnValue(None) + # pop from the front, and add back to the back update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) + res = yield self._do_background_update(update_name, desired_duration_ms) + defer.returnValue(res) + + @defer.inlineCallbacks + def _do_background_update(self, update_name, desired_duration_ms): + logger.info("Starting update batch on background update '%s'", + update_name) + update_handler = self._background_update_handlers[update_name] performance = self._background_update_performance.get(update_name) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 6e4d9b1373..4944cb0d2e 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - hs = yield setup_test_homeserver() + hs = yield setup_test_homeserver() # type: synapse.server.HomeServer self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase): "test_update", self.update_handler ) + # run the real background updates, to get them out the way + # (perhaps we should run them as part of the test HS setup, since we + # run all of the other schema setup stuff there?) + while True: + res = yield self.store.do_next_background_update(1000) + if res is None: + break + @defer.inlineCallbacks def test_do_background_update(self): desired_count = 1000 duration_ms = 42 + # first step: make a bit of progress @defer.inlineCallbacks def update(progress, count): self.clock.advance_time_msec(count * duration_ms) @@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): yield self.store.start_background_update("test_update", {"my_key": 1}) self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNotNone(result) @@ -50,24 +59,25 @@ class BackgroundUpdateTestCase(unittest.TestCase): {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE ) + # second step: complete the update @defer.inlineCallbacks def update(progress, count): yield self.store._end_background_update("test_update") defer.returnValue(count) self.update_handler.side_effect = update - self.update_handler.reset_mock() - result = yield self.store.do_background_update( - duration_ms * desired_count + result = yield self.store.do_next_background_update( + duration_ms * desired_count ) self.assertIsNotNone(result) self.update_handler.assert_called_once_with( {"my_key": 2}, desired_count ) + # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNone(result) -- cgit 1.4.1 From 9dbd903f4108c81499205ff80d9d420911fd0f54 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 14:05:23 +0100 Subject: background updates: Fix assertion to do something --- synapse/storage/background_updates.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 2771f7c3c1..321c889b2f 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -88,8 +88,8 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - assert(self._background_update_timer is not None, - "background updates already running") + assert self._background_update_timer is not None, \ + "background updates already running" logger.info("Starting background schema updates") -- cgit 1.4.1 From 2ee4c9ee023a50ae7c0800c34c609886fb27298f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 16:01:46 +0100 Subject: background updates: fix assert again --- synapse/storage/background_updates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 321c889b2f..af9bfbbe47 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -88,7 +88,7 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - assert self._background_update_timer is not None, \ + assert self._background_update_timer is None, \ "background updates already running" logger.info("Starting background schema updates") -- cgit 1.4.1 From 955ef1f06caee7385cb5ef21477b4d0490889c3c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 16:04:45 +0100 Subject: fix: defer.returnValue takes one argument --- synapse/storage/background_updates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index af9bfbbe47..30d0e4c5dc 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -115,7 +115,7 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) - defer.returnValue() + defer.returnValue(None) @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): -- cgit 1.4.1 From 2623cec8746392067e781164e8ed4f2236b15bec Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 16:12:16 +0100 Subject: Don't add rejections to the state_group, persist all rejections --- synapse/storage/events.py | 9 +++++---- synapse/storage/state.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6610549281..41c9b17d14 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -591,10 +591,11 @@ class EventsStore(SQLBaseStore): ], ) - if context.rejected: - self._store_rejections_txn( - txn, event.event_id, context.rejected - ) + for event, context in events_and_contexts: + if context.rejected: + self._store_rejections_txn( + txn, event.event_id, context.rejected + ) self._simple_insert_many_txn( txn, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5b743db67a..cc1c7ec6a7 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -79,7 +79,7 @@ class StateStore(SQLBaseStore): state_events = dict(context.current_state) - if event.is_state(): + if event.is_state() and not context.rejected: state_events[(event.type, event.state_key)] = event state_group = context.new_state_group_id -- cgit 1.4.1 From 8f7f4cb92baa1eb8c772644e2567fe56d563b4b9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 17:13:37 +0100 Subject: Don't add the events to forward extremities if the event is rejected --- synapse/storage/events.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 41c9b17d14..201a4455fa 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -498,8 +498,8 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - - self._update_extremeties(txn, [event]) + if not context.rejected: + self._update_extremeties(txn, [event]) events_and_contexts = [ ec for ec in events_and_contexts if ec[0] not in to_remove @@ -512,7 +512,10 @@ class EventsStore(SQLBaseStore): self._handle_mult_prev_events( txn, - events=[event for event, _ in events_and_contexts], + events=[ + event for event, context in events_and_contexts + if not context.rejected + ], ) for event, _ in events_and_contexts: -- cgit 1.4.1 From 33d08e843368a9caf01835ec4d56160fdc0f9469 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 Jul 2016 15:56:57 +0100 Subject: Log when adding listeners --- synapse/http/server.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/http/server.py b/synapse/http/server.py index f705abab94..2b3c05a740 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -205,6 +205,7 @@ class JsonResource(HttpServer, resource.Resource): def register_paths(self, method, path_patterns, callback): for path_pattern in path_patterns: + logger.debug("Registering for %s %s", method, path_pattern.pattern) self.path_regexs.setdefault(method, []).append( self._PathEntry(path_pattern, callback) ) -- cgit 1.4.1 From 1b3c3e6d68bf503bf09e046ecf57bb652669e637 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 18:44:30 +0100 Subject: Only update the events and event_json tables for rejected events --- synapse/storage/events.py | 113 +++++++++++++++++++++++++--------------------- synapse/storage/state.py | 2 +- 2 files changed, 63 insertions(+), 52 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 201a4455fa..c38a631081 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -407,21 +407,11 @@ class EventsStore(SQLBaseStore): event.room_id, event.internal_metadata.stream_ordering, ) - if not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier() and not context.rejected: depth_updates[event.room_id] = max( event.depth, depth_updates.get(event.room_id, event.depth) ) - if context.push_actions: - self._set_push_actions_for_event_and_users_txn( - txn, event, context.push_actions - ) - - if event.type == EventTypes.Redaction and event.redacts is not None: - self._remove_push_actions_for_event_id_txn( - txn, event.room_id, event.redacts - ) - for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) @@ -431,6 +421,7 @@ class EventsStore(SQLBaseStore): ), [event.event_id for event, _ in events_and_contexts] ) + have_persisted = { event_id: outlier for event_id, outlier in txn.fetchall() @@ -442,6 +433,9 @@ class EventsStore(SQLBaseStore): # Handle the case of the list including the same event multiple # times. The tricky thing here is when they differ by whether # they are an outlier. + if context.rejected: + continue + if event.event_id in event_map: other = event_map[event.event_id] @@ -498,8 +492,8 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - if not context.rejected: - self._update_extremeties(txn, [event]) + + self._update_extremeties(txn, [event]) events_and_contexts = [ ec for ec in events_and_contexts if ec[0] not in to_remove @@ -508,39 +502,8 @@ class EventsStore(SQLBaseStore): if not events_and_contexts: return - self._store_mult_state_groups_txn(txn, events_and_contexts) - - self._handle_mult_prev_events( - txn, - events=[ - event for event, context in events_and_contexts - if not context.rejected - ], - ) - - for event, _ in events_and_contexts: - if event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Message: - self._store_room_message_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - elif event.type == EventTypes.RoomHistoryVisibility: - self._store_history_visibility_txn(txn, event) - elif event.type == EventTypes.GuestAccess: - self._store_guest_access_txn(txn, event) - - self._store_room_members_txn( - txn, - [ - event - for event, _ in events_and_contexts - if event.type == EventTypes.Member - ], - backfilled=backfilled, - ) + # From this point onwards the events are only events that we haven't + # seen before. def event_dict(event): return { @@ -594,11 +557,28 @@ class EventsStore(SQLBaseStore): ], ) + to_remove = set() for event, context in events_and_contexts: if context.rejected: self._store_rejections_txn( txn, event.event_id, context.rejected ) + to_remove.add(event.event_id) + + events_and_contexts = [ + ec for ec in events_and_contexts if ec[0].event_id not in to_remove + ] + + if not events_and_contexts: + return + + # From this point onwards the events are only ones that weren't rejected. + + for event, context in events_and_contexts: + if context.push_actions: + self._set_push_actions_for_event_and_users_txn( + txn, event, context.push_actions + ) self._simple_insert_many_txn( txn, @@ -614,6 +594,42 @@ class EventsStore(SQLBaseStore): ], ) + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts + ) + + self._store_mult_state_groups_txn(txn, events_and_contexts) + + self._handle_mult_prev_events( + txn, + events=[event for event, _ in events_and_contexts], + ) + + for event, _ in events_and_contexts: + if event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Message: + self._store_room_message_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + elif event.type == EventTypes.RoomHistoryVisibility: + self._store_history_visibility_txn(txn, event) + elif event.type == EventTypes.GuestAccess: + self._store_guest_access_txn(txn, event) + + self._store_room_members_txn( + txn, + [ + event + for event, _ in events_and_contexts + if event.type == EventTypes.Member + ], + backfilled=backfilled, + ) + self._store_event_reference_hashes_txn( txn, [event for event, _ in events_and_contexts] ) @@ -670,11 +686,6 @@ class EventsStore(SQLBaseStore): # Outlier events shouldn't clobber the current state. continue - if context.rejected: - # If the event failed it's auth checks then it shouldn't - # clobbler the current state. - continue - txn.call_after( self._get_current_state_for_key.invalidate, (event.room_id, event.type, event.state_key,) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index cc1c7ec6a7..5b743db67a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -79,7 +79,7 @@ class StateStore(SQLBaseStore): state_events = dict(context.current_state) - if event.is_state() and not context.rejected: + if event.is_state(): state_events[(event.type, event.state_key)] = event state_group = context.new_state_group_id -- cgit 1.4.1 From 436bffd15fb8382a0d2dddd3c6f7a077ba751da2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 14:52:53 +0100 Subject: Implement deleting devices --- synapse/handlers/auth.py | 22 ++++++++++++++++-- synapse/handlers/device.py | 27 +++++++++++++++++++++- synapse/rest/client/v1/login.py | 13 ++++++++--- synapse/rest/client/v2_alpha/devices.py | 14 +++++++++++ synapse/rest/client/v2_alpha/register.py | 10 ++++---- synapse/storage/devices.py | 15 ++++++++++++ synapse/storage/registration.py | 26 +++++++++++++++++---- .../schema/delta/33/access_tokens_device_index.sql | 17 ++++++++++++++ .../schema/delta/33/refreshtoken_device_index.sql | 17 ++++++++++++++ tests/handlers/test_device.py | 22 ++++++++++++++++-- tests/rest/client/v2_alpha/test_register.py | 14 +++++++---- 11 files changed, 176 insertions(+), 21 deletions(-) create mode 100644 synapse/storage/schema/delta/33/access_tokens_device_index.sql create mode 100644 synapse/storage/schema/delta/33/refreshtoken_device_index.sql (limited to 'synapse') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d5d2072436..2e138f328f 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -77,6 +77,7 @@ class AuthHandler(BaseHandler): self.ldap_bind_password = hs.config.ldap_bind_password self.hs = hs # FIXME better possibility to access registrationHandler later? + self.device_handler = hs.get_device_handler() @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): @@ -374,7 +375,8 @@ class AuthHandler(BaseHandler): return self._check_password(user_id, password) @defer.inlineCallbacks - def get_login_tuple_for_user_id(self, user_id, device_id=None): + def get_login_tuple_for_user_id(self, user_id, device_id=None, + initial_display_name=None): """ Gets login tuple for the user with the given user ID. @@ -383,9 +385,15 @@ class AuthHandler(BaseHandler): The user is assumed to have been authenticated by some other machanism (e.g. CAS), and the user_id converted to the canonical case. + The device will be recorded in the table if it is not there already. + Args: user_id (str): canonical User ID - device_id (str): the device ID to associate with the access token + device_id (str|None): the device ID to associate with the tokens. + None to leave the tokens unassociated with a device (deprecated: + we should always have a device ID) + initial_display_name (str): display name to associate with the + device if it needs re-registering Returns: A tuple of: The access token for the user's session. @@ -397,6 +405,16 @@ class AuthHandler(BaseHandler): logger.info("Logging in user %s on device %s", user_id, device_id) access_token = yield self.issue_access_token(user_id, device_id) refresh_token = yield self.issue_refresh_token(user_id, device_id) + + # the device *should* have been registered before we got here; however, + # it's possible we raced against a DELETE operation. The thing we + # really don't want is active access_tokens without a record of the + # device, so we double-check it here. + if device_id is not None: + yield self.device_handler.check_device_registered( + user_id, device_id, initial_display_name + ) + defer.returnValue((access_token, refresh_token)) @defer.inlineCallbacks diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1f9e15c33c..a7a192e1c9 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -100,7 +100,7 @@ class DeviceHandler(BaseHandler): Args: user_id (str): - device_id (str) + device_id (str): Returns: defer.Deferred: dict[str, X]: info on the device @@ -117,6 +117,31 @@ class DeviceHandler(BaseHandler): _update_device_from_client_ips(device, ips) defer.returnValue(device) + @defer.inlineCallbacks + def delete_device(self, user_id, device_id): + """ Delete the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: + """ + + try: + yield self.store.delete_device(user_id, device_id) + except errors.StoreError, e: + if e.code == 404: + # no match + pass + else: + raise + + yield self.store.user_delete_access_tokens(user_id, + device_id=device_id) + + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index e8b791519c..92fcae674a 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -152,7 +152,10 @@ class LoginRestServlet(ClientV1RestServlet): ) device_id = yield self._register_device(user_id, login_submission) access_token, refresh_token = ( - yield auth_handler.get_login_tuple_for_user_id(user_id, device_id) + yield auth_handler.get_login_tuple_for_user_id( + user_id, device_id, + login_submission.get("initial_device_display_name") + ) ) result = { "user_id": user_id, # may have changed @@ -173,7 +176,10 @@ class LoginRestServlet(ClientV1RestServlet): ) device_id = yield self._register_device(user_id, login_submission) access_token, refresh_token = ( - yield auth_handler.get_login_tuple_for_user_id(user_id, device_id) + yield auth_handler.get_login_tuple_for_user_id( + user_id, device_id, + login_submission.get("initial_device_display_name") + ) ) result = { "user_id": user_id, # may have changed @@ -262,7 +268,8 @@ class LoginRestServlet(ClientV1RestServlet): ) access_token, refresh_token = ( yield auth_handler.get_login_tuple_for_user_id( - registered_user_id, device_id + registered_user_id, device_id, + login_submission.get("initial_device_display_name") ) ) result = { diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 8b9ab4f674..30ef8b3da9 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -70,6 +70,20 @@ class DeviceRestServlet(RestServlet): ) defer.returnValue((200, device)) + @defer.inlineCallbacks + def on_DELETE(self, request, device_id): + # XXX: it's not completely obvious we want to expose this endpoint. + # It allows the client to delete access tokens, which feels like a + # thing which merits extra auth. But if we want to do the interactive- + # auth dance, we should really make it possible to delete more than one + # device at a time. + requester = yield self.auth.get_user_by_req(request) + yield self.device_handler.delete_device( + requester.user.to_string(), + device_id, + ) + defer.returnValue((200, {})) + def register_servlets(hs, http_server): DevicesRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c8c9395fc6..9f599ea8bb 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -374,13 +374,13 @@ class RegisterRestServlet(RestServlet): """ device_id = yield self._register_device(user_id, params) - access_token = yield self.auth_handler.issue_access_token( - user_id, device_id=device_id + access_token, refresh_token = ( + yield self.auth_handler.get_login_tuple_for_user_id( + user_id, device_id=device_id, + initial_display_name=params.get("initial_device_display_name") + ) ) - refresh_token = yield self.auth_handler.issue_refresh_token( - user_id, device_id=device_id - ) defer.returnValue({ "user_id": user_id, "access_token": access_token, diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 1cc6e07f2b..4689980f80 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -76,6 +76,21 @@ class DeviceStore(SQLBaseStore): desc="get_device", ) + def delete_device(self, user_id, device_id): + """Delete a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to retrieve + Returns: + defer.Deferred + """ + return self._simple_delete_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_device", + ) + @defer.inlineCallbacks def get_devices_by_user(self, user_id): """Retrieve all of a user's registered devices. diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9a92b35361..935e82bf7a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -18,18 +18,31 @@ import re from twisted.internet import defer from synapse.api.errors import StoreError, Codes - -from ._base import SQLBaseStore +from synapse.storage import background_updates from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -class RegistrationStore(SQLBaseStore): +class RegistrationStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): super(RegistrationStore, self).__init__(hs) self.clock = hs.get_clock() + self.register_background_index_update( + "access_tokens_device_index", + index_name="access_tokens_device_id", + table="access_tokens", + columns=["user_id", "device_id"], + ) + + self.register_background_index_update( + "refresh_tokens_device_index", + index_name="refresh_tokens_device_id", + table="refresh_tokens", + columns=["user_id", "device_id"], + ) + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): """Adds an access token for the given user. @@ -238,11 +251,16 @@ class RegistrationStore(SQLBaseStore): self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[]): + def user_delete_access_tokens(self, user_id, except_token_ids=[], + device_id=None): def f(txn): sql = "SELECT token FROM access_tokens WHERE user_id = ?" clauses = [user_id] + if device_id is not None: + sql += " AND device_id = ?" + clauses.append(device_id) + if except_token_ids: sql += " AND id NOT IN (%s)" % ( ",".join(["?" for _ in except_token_ids]), diff --git a/synapse/storage/schema/delta/33/access_tokens_device_index.sql b/synapse/storage/schema/delta/33/access_tokens_device_index.sql new file mode 100644 index 0000000000..61ad3fe3e8 --- /dev/null +++ b/synapse/storage/schema/delta/33/access_tokens_device_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('access_tokens_device_index', '{}'); diff --git a/synapse/storage/schema/delta/33/refreshtoken_device_index.sql b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql new file mode 100644 index 0000000000..bb225dafbf --- /dev/null +++ b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('refresh_tokens_device_index', '{}'); diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 331aa13fed..214e722eb3 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -12,11 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse import types + from twisted.internet import defer +import synapse.api.errors import synapse.handlers.device + import synapse.storage +from synapse import types from tests import unittest, utils user1 = "@boris:aaa" @@ -27,7 +30,7 @@ class DeviceTestCase(unittest.TestCase): def __init__(self, *args, **kwargs): super(DeviceTestCase, self).__init__(*args, **kwargs) self.store = None # type: synapse.storage.DataStore - self.handler = None # type: device.DeviceHandler + self.handler = None # type: synapse.handlers.device.DeviceHandler self.clock = None # type: utils.MockClock @defer.inlineCallbacks @@ -123,6 +126,21 @@ class DeviceTestCase(unittest.TestCase): "last_seen_ts": 3000000, }, res) + @defer.inlineCallbacks + def test_delete_device(self): + yield self._record_users() + + # delete the device + yield self.handler.delete_device(user1, "abc") + + # check the device was deleted + with self.assertRaises(synapse.api.errors.NotFoundError): + yield self.handler.get_device(user1, "abc") + + # we'd like to check the access token was invalidated, but that's a + # bit of a PITA. + + @defer.inlineCallbacks def _record_users(self): # check this works for both devices which have a recorded client_ip, diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 3bd7065e32..8ac56a1fb2 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -65,13 +65,16 @@ class RegisterRestServletTestCase(unittest.TestCase): self.registration_handler.appservice_register = Mock( return_value=user_id ) - self.auth_handler.issue_access_token = Mock(return_value=token) + self.auth_handler.get_login_tuple_for_user_id = Mock( + return_value=(token, "kermits_refresh_token") + ) (code, result) = yield self.servlet.on_POST(self.request) self.assertEquals(code, 200) det_data = { "user_id": user_id, "access_token": token, + "refresh_token": "kermits_refresh_token", "home_server": self.hs.hostname } self.assertDictContainsSubset(det_data, result) @@ -121,7 +124,9 @@ class RegisterRestServletTestCase(unittest.TestCase): "password": "monkey" }, None) self.registration_handler.register = Mock(return_value=(user_id, None)) - self.auth_handler.issue_access_token = Mock(return_value=token) + self.auth_handler.get_login_tuple_for_user_id = Mock( + return_value=(token, "kermits_refresh_token") + ) self.device_handler.check_device_registered = \ Mock(return_value=device_id) @@ -130,13 +135,14 @@ class RegisterRestServletTestCase(unittest.TestCase): det_data = { "user_id": user_id, "access_token": token, + "refresh_token": "kermits_refresh_token", "home_server": self.hs.hostname, "device_id": device_id, } self.assertDictContainsSubset(det_data, result) self.assertIn("refresh_token", result) - self.auth_handler.issue_access_token.assert_called_once_with( - user_id, device_id=device_id) + self.auth_handler.get_login_tuple_for_user_id( + user_id, device_id=device_id, initial_device_display_name=None) def test_POST_disabled_registration(self): self.hs.config.enable_registration = False -- cgit 1.4.1 From 012b4c19132d57fdbc1b6b0e304eb60eaf19200f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 17:51:24 +0100 Subject: Implement updating devices You can update the displayname of devices now. --- synapse/handlers/device.py | 24 ++++++++++++++++++++++ synapse/rest/client/v2_alpha/devices.py | 24 +++++++++++++++------- synapse/storage/devices.py | 27 ++++++++++++++++++++++++- tests/handlers/test_device.py | 16 +++++++++++++++ tests/storage/test_devices.py | 36 +++++++++++++++++++++++++++++++++ 5 files changed, 119 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a7a192e1c9..9e65d85e6d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -141,6 +141,30 @@ class DeviceHandler(BaseHandler): yield self.store.user_delete_access_tokens(user_id, device_id=device_id) + @defer.inlineCallbacks + def update_device(self, user_id, device_id, content): + """ Update the given device + + Args: + user_id (str): + device_id (str): + content (dict): body of update request + + Returns: + defer.Deferred: + """ + + try: + yield self.store.update_device( + user_id, + device_id, + new_display_name=content.get("display_name") + ) + except errors.StoreError, e: + if e.code == 404: + raise errors.NotFoundError() + else: + raise def _update_device_from_client_ips(device, client_ips): diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 30ef8b3da9..8fbd3d3dfc 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -13,19 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging -from synapse.http.servlet import RestServlet +from twisted.internet import defer +from synapse.http import servlet from ._base import client_v2_patterns -import logging - - logger = logging.getLogger(__name__) -class DevicesRestServlet(RestServlet): +class DevicesRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns("/devices$", releases=[], v2_alpha=False) def __init__(self, hs): @@ -47,7 +45,7 @@ class DevicesRestServlet(RestServlet): defer.returnValue((200, {"devices": devices})) -class DeviceRestServlet(RestServlet): +class DeviceRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns("/devices/(?P[^/]*)$", releases=[], v2_alpha=False) @@ -84,6 +82,18 @@ class DeviceRestServlet(RestServlet): ) defer.returnValue((200, {})) + @defer.inlineCallbacks + def on_PUT(self, request, device_id): + requester = yield self.auth.get_user_by_req(request) + + body = servlet.parse_json_object_from_request(request) + yield self.device_handler.update_device( + requester.user.to_string(), + device_id, + body + ) + defer.returnValue((200, {})) + def register_servlets(hs, http_server): DevicesRestServlet(hs).register(http_server) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 4689980f80..afd6530cab 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -81,7 +81,7 @@ class DeviceStore(SQLBaseStore): Args: user_id (str): The ID of the user which owns the device - device_id (str): The ID of the device to retrieve + device_id (str): The ID of the device to delete Returns: defer.Deferred """ @@ -91,6 +91,31 @@ class DeviceStore(SQLBaseStore): desc="delete_device", ) + def update_device(self, user_id, device_id, new_display_name=None): + """Update a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to update + new_display_name (str|None): new displayname for device; None + to leave unchanged + Raises: + StoreError: if the device is not found + Returns: + defer.Deferred + """ + updates = {} + if new_display_name is not None: + updates["display_name"] = new_display_name + if not updates: + return defer.succeed(None) + return self._simple_update_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + updatevalues=updates, + desc="update_device", + ) + @defer.inlineCallbacks def get_devices_by_user(self, user_id): """Retrieve all of a user's registered devices. diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 214e722eb3..85a970a6c9 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -140,6 +140,22 @@ class DeviceTestCase(unittest.TestCase): # we'd like to check the access token was invalidated, but that's a # bit of a PITA. + @defer.inlineCallbacks + def test_update_device(self): + yield self._record_users() + + update = {"display_name": "new display"} + yield self.handler.update_device(user1, "abc", update) + + res = yield self.handler.get_device(user1, "abc") + self.assertEqual(res["display_name"], "new display") + + @defer.inlineCallbacks + def test_update_unknown_device(self): + update = {"display_name": "new_display"} + with self.assertRaises(synapse.api.errors.NotFoundError): + yield self.handler.update_device("user_id", "unknown_device_id", + update) @defer.inlineCallbacks def _record_users(self): diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index a6ce993375..f8725acea0 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -15,6 +15,7 @@ from twisted.internet import defer +import synapse.api.errors import tests.unittest import tests.utils @@ -67,3 +68,38 @@ class DeviceStoreTestCase(tests.unittest.TestCase): "device_id": "device2", "display_name": "display_name 2", }, res["device2"]) + + @defer.inlineCallbacks + def test_update_device(self): + yield self.store.store_device( + "user_id", "device_id", "display_name 1" + ) + + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 1", res["display_name"]) + + # do a no-op first + yield self.store.update_device( + "user_id", "device_id", + ) + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 1", res["display_name"]) + + # do the update + yield self.store.update_device( + "user_id", "device_id", + new_display_name="display_name 2", + ) + + # check it worked + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 2", res["display_name"]) + + @defer.inlineCallbacks + def test_update_unknown_device(self): + with self.assertRaises(synapse.api.errors.StoreError) as cm: + yield self.store.update_device( + "user_id", "unknown_device_id", + new_display_name="display_name 2", + ) + self.assertEqual(404, cm.exception.code) -- cgit 1.4.1 From 242c52d607da68f48b3a4bce980663e0e5f103c6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 26 Jul 2016 10:09:25 +0200 Subject: typo --- synapse/util/metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index e1f374807e..0b944d3e63 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -84,7 +84,7 @@ class Measure(object): if context != self.start_context: logger.warn( - "Context have unexpectedly changed from '%s' to '%s'. (%r)", + "Context has unexpectedly changed from '%s' to '%s'. (%r)", context, self.start_context, self.name ) return -- cgit 1.4.1 From efeb6176c169835465eeb6184ead940a89b93b4e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 10:49:52 +0100 Subject: Don't add rejected events if we've seen them befrore. Add some comments to explain what the code is doing mechanically --- synapse/storage/events.py | 53 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c38a631081..25a2be2795 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled): + """Insert some number of room events into the necessary database tables. + + Rejected events are only inserted into the events table, the events_json table, + and the rejections table. Things reading from those table will need to check + whether the event was rejected. + """ depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -427,15 +433,21 @@ class EventsStore(SQLBaseStore): for event_id, outlier in txn.fetchall() } + # Remove the events that we've seen before. event_map = {} to_remove = set() for event, context in events_and_contexts: - # Handle the case of the list including the same event multiple - # times. The tricky thing here is when they differ by whether - # they are an outlier. if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + if event.event_id in have_persisted: + # If we have already seen the event then ignore it. + to_remove.add(event) continue + # Handle the case of the list including the same event multiple + # times. The tricky thing here is when they differ by whether + # they are an outlier. if event.event_id in event_map: other = event_map[event.event_id] @@ -457,6 +469,12 @@ class EventsStore(SQLBaseStore): outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: + # We received a copy of an event that we had already stored as + # an outlier in the database. We now have some state at that + # so we need to update the state_groups table with that state. + + # insert into the state_group, state_groups_state and + # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, ((event, context),)) metadata_json = encode_json( @@ -472,6 +490,8 @@ class EventsStore(SQLBaseStore): (metadata_json, event.event_id,) ) + # Add an entry to the ex_outlier_stream table to replicate the + # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering state_group_id = context.state_group or context.new_state_group_id self._simple_insert_txn( @@ -493,6 +513,8 @@ class EventsStore(SQLBaseStore): (False, event.event_id,) ) + # Update the event_backward_extremities table now that this + # event isn't an outlier any more. self._update_extremeties(txn, [event]) events_and_contexts = [ @@ -557,24 +579,30 @@ class EventsStore(SQLBaseStore): ], ) + # Remove the rejected events from the list now that we've added them + # to the events table and the events_json table. to_remove = set() for event, context in events_and_contexts: if context.rejected: + # Insert the event_id into the rejections table self._store_rejections_txn( txn, event.event_id, context.rejected ) - to_remove.add(event.event_id) + to_remove.add(event) events_and_contexts = [ - ec for ec in events_and_contexts if ec[0].event_id not in to_remove + ec for ec in events_and_contexts if ec[0] not in to_remove ] if not events_and_contexts: + # Make sure we don't pass an empty list to functions that expect to + # be storing at least one element. return # From this point onwards the events are only ones that weren't rejected. for event, context in events_and_contexts: + # Insert all the push actions into the event_push_actions table. if context.push_actions: self._set_push_actions_for_event_and_users_txn( txn, event, context.push_actions @@ -595,12 +623,18 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Redaction and event.redacts is not None: + # Remove the entries in the event_push_actions table for the + # redacted event. self._remove_push_actions_for_event_id_txn( txn, event.room_id, event.redacts ) + # Insert into the state_groups, state_groups_state, and + # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, events_and_contexts) + # Update the event_forward_extremities, event_backward_extremities and + # event_edges tables. self._handle_mult_prev_events( txn, events=[event for event, _ in events_and_contexts], @@ -608,18 +642,25 @@ class EventsStore(SQLBaseStore): for event, _ in events_and_contexts: if event.type == EventTypes.Name: + # Insert into the room_names and event_search tables. self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: + # Insert into the topics table and event_search table. self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Message: + # Insert into the event_search table. self._store_room_message_txn(txn, event) elif event.type == EventTypes.Redaction: + # Insert into the redactions table. self._store_redaction(txn, event) elif event.type == EventTypes.RoomHistoryVisibility: + # Insert into the event_search table. self._store_history_visibility_txn(txn, event) elif event.type == EventTypes.GuestAccess: + # Insert into the event_search table. self._store_guest_access_txn(txn, event) + # Insert into the room_memberships table. self._store_room_members_txn( txn, [ @@ -630,6 +671,7 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, ) + # Insert event_reference_hashes table. self._store_event_reference_hashes_txn( txn, [event for event, _ in events_and_contexts] ) @@ -674,6 +716,7 @@ class EventsStore(SQLBaseStore): ], ) + # Prefil the event cache self._add_to_cache(txn, events_and_contexts) if backfilled: -- cgit 1.4.1 From a6f06ce3e280cfa18f51748a7d4327001658db40 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 11:05:39 +0100 Subject: Fix how push_actions are redacted. --- synapse/storage/events.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 25a2be2795..c63ca36df6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -522,6 +522,8 @@ class EventsStore(SQLBaseStore): ] if not events_and_contexts: + # Make sure we don't pass an empty list to functions that expect to + # be storing at least one element. return # From this point onwards the events are only events that we haven't @@ -608,6 +610,13 @@ class EventsStore(SQLBaseStore): txn, event, context.push_actions ) + if event.type == EventTypes.Redaction and event.redacts is not None: + # Remove the entries in the event_push_actions table for the + # redacted event. + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts + ) + self._simple_insert_many_txn( txn, table="event_auth", @@ -622,13 +631,6 @@ class EventsStore(SQLBaseStore): ], ) - if event.type == EventTypes.Redaction and event.redacts is not None: - # Remove the entries in the event_push_actions table for the - # redacted event. - self._remove_push_actions_for_event_id_txn( - txn, event.room_id, event.redacts - ) - # Insert into the state_groups, state_groups_state, and # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, events_and_contexts) @@ -716,7 +718,7 @@ class EventsStore(SQLBaseStore): ], ) - # Prefil the event cache + # Prefill the event cache self._add_to_cache(txn, events_and_contexts) if backfilled: -- cgit 1.4.1 From 8e0249416643f20f0c4cd8f2e19cf45ea63289d3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 11:09:47 +0100 Subject: Delete refresh tokens when deleting devices --- synapse/handlers/device.py | 6 ++-- synapse/storage/registration.py | 58 +++++++++++++++++++++++++++++--------- tests/storage/test_registration.py | 34 ++++++++++++++++++++++ 3 files changed, 83 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9e65d85e6d..eaead50800 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -138,8 +138,10 @@ class DeviceHandler(BaseHandler): else: raise - yield self.store.user_delete_access_tokens(user_id, - device_id=device_id) + yield self.store.user_delete_access_tokens( + user_id, device_id=device_id, + delete_refresh_tokens=True, + ) @defer.inlineCallbacks def update_device(self, user_id, device_id, content): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 935e82bf7a..d9555e073a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -252,20 +252,36 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[], - device_id=None): - def f(txn): - sql = "SELECT token FROM access_tokens WHERE user_id = ?" + device_id=None, + delete_refresh_tokens=False): + """ + Invalidate access/refresh tokens belonging to a user + + Args: + user_id (str): ID of user the tokens belong to + except_token_ids (list[str]): list of access_tokens which should + *not* be deleted + device_id (str|None): ID of device the tokens are associated with. + If None, tokens associated with any device (or no device) will + be deleted + delete_refresh_tokens (bool): True to delete refresh tokens as + well as access tokens. + Returns: + defer.Deferred: + """ + def f(txn, table, except_tokens, call_after_delete): + sql = "SELECT token FROM %s WHERE user_id = ?" % table clauses = [user_id] if device_id is not None: sql += " AND device_id = ?" clauses.append(device_id) - if except_token_ids: + if except_tokens: sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_token_ids]), + ",".join(["?" for _ in except_tokens]), ) - clauses += except_token_ids + clauses += except_tokens txn.execute(sql, clauses) @@ -274,16 +290,33 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): n = 100 chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] for chunk in chunks: - for row in chunk: - txn.call_after(self.get_user_by_access_token.invalidate, (row[0],)) + if call_after_delete: + for row in chunk: + txn.call_after(call_after_delete, (row[0],)) txn.execute( - "DELETE FROM access_tokens WHERE token in (%s)" % ( + "DELETE FROM %s WHERE token in (%s)" % ( + table, ",".join(["?" for _ in chunk]), ), [r[0] for r in chunk] ) - yield self.runInteraction("user_delete_access_tokens", f) + # delete refresh tokens first, to stop new access tokens being + # allocated while our backs are turned + if delete_refresh_tokens: + yield self.runInteraction( + "user_delete_access_tokens", f, + table="refresh_tokens", + except_tokens=[], + call_after_delete=None, + ) + + yield self.runInteraction( + "user_delete_access_tokens", f, + table="access_tokens", + except_tokens=except_token_ids, + call_after_delete=self.get_user_by_access_token.invalidate, + ) def delete_access_token(self, access_token): def f(txn): @@ -306,9 +339,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: token (str): The access token of a user. Returns: - dict: Including the name (user_id) and the ID of their access token. - Raises: - StoreError if no user was found. + defer.Deferred: None, if the token did not match, ootherwise dict + including the keys `name`, `is_guest`, `device_id`, `token_id`. """ return self.runInteraction( "get_user_by_access_token", diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index b03ca303a2..f7d74dea8e 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -128,6 +128,40 @@ class RegistrationStoreTestCase(unittest.TestCase): with self.assertRaises(StoreError): yield self.store.exchange_refresh_token(last_token, generator.generate) + @defer.inlineCallbacks + def test_user_delete_access_tokens(self): + # add some tokens + generator = TokenGenerator() + refresh_token = generator.generate(self.user_id) + yield self.store.register(self.user_id, self.tokens[0], self.pwhash) + yield self.store.add_access_token_to_user(self.user_id, self.tokens[1], + self.device_id) + yield self.store.add_refresh_token_to_user(self.user_id, refresh_token, + self.device_id) + + # now delete some + yield self.store.user_delete_access_tokens( + self.user_id, device_id=self.device_id, delete_refresh_tokens=True) + + # check they were deleted + user = yield self.store.get_user_by_access_token(self.tokens[1]) + self.assertIsNone(user, "access token was not deleted by device_id") + with self.assertRaises(StoreError): + yield self.store.exchange_refresh_token(refresh_token, + generator.generate) + + # check the one not associated with the device was not deleted + user = yield self.store.get_user_by_access_token(self.tokens[0]) + self.assertEqual(self.user_id, user["name"]) + + # now delete the rest + yield self.store.user_delete_access_tokens( + self.user_id, delete_refresh_tokens=True) + + user = yield self.store.get_user_by_access_token(self.tokens[0]) + self.assertIsNone(user, + "access token was not deleted without device_id") + class TokenGenerator: def __init__(self): -- cgit 1.4.1 From 05e7e5e972446b639997f0ea461c2eea39617342 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 11:59:08 +0100 Subject: Fix flake8 violation Apparently flake8 v3 puts the error on a different line to v2. Easiest way to make sure that happens is by putting the whole statement on one line :) --- synapse/app/__init__.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index 1bc4279807..9c2b627590 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -16,13 +16,11 @@ import sys sys.dont_write_bytecode = True -from synapse.python_dependencies import ( - check_requirements, MissingRequirementError -) # NOQA +from synapse import python_dependencies # noqa: E402 try: - check_requirements() -except MissingRequirementError as e: + python_dependencies.check_requirements() +except python_dependencies.MissingRequirementError as e: message = "\n".join([ "Missing Requirement: %s" % (e.message,), "To install run:", -- cgit 1.4.1 From 33d777647325501d2a1d18d95efc5f9f64eeb46e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 13:32:15 +0100 Subject: Fix typo --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d9555e073a..7e7d32eb66 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -339,7 +339,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: token (str): The access token of a user. Returns: - defer.Deferred: None, if the token did not match, ootherwise dict + defer.Deferred: None, if the token did not match, otherwise dict including the keys `name`, `is_guest`, `device_id`, `token_id`. """ return self.runInteraction( -- cgit 1.4.1 From c824b29e77cd1745f8ac14f2a73c3b8590acaac9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 16:39:14 +0100 Subject: Check if the user is banned when handling 3pid invites --- synapse/api/auth.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index eca8513905..f399aa8c7c 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -376,6 +376,10 @@ class Auth(object): if Membership.INVITE == membership and "third_party_invite" in event.content: if not self._verify_third_party_invite(event, auth_events): raise AuthError(403, "You are not invited to this room.") + if target_banned: + raise AuthError( + 403, "%s is banned from the room" % (target_user_id,) + ) return True if Membership.JOIN != membership: -- cgit 1.4.1 From eb359eced44407b1ee9648f10fdf3df63c8d40ad Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 16:46:53 +0100 Subject: Add `create_requester` function Wrap the `Requester` constructor with a function which provides sensible defaults, and use it throughout --- synapse/api/auth.py | 24 +++++++++++------------- synapse/handlers/_base.py | 13 +++++++------ synapse/handlers/profile.py | 12 +++++++----- synapse/handlers/register.py | 16 +++++++++------- synapse/handlers/room_member.py | 20 +++++++++----------- synapse/rest/client/v2_alpha/keys.py | 10 ++++------ synapse/types.py | 33 ++++++++++++++++++++++++++++++++- tests/handlers/test_profile.py | 10 ++++++---- tests/replication/test_resource.py | 20 +++++++++++--------- tests/rest/client/v1/test_profile.py | 13 +++++-------- tests/utils.py | 5 ----- 11 files changed, 101 insertions(+), 75 deletions(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index eca8513905..eecf3b0b2a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -13,22 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +import pymacaroons from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json, SignatureVerifyException - from twisted.internet import defer +from unpaddedbase64 import decode_base64 +import synapse.types from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError -from synapse.types import Requester, UserID, get_domain_from_id -from synapse.util.logutils import log_function +from synapse.types import UserID, get_domain_from_id from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logutils import log_function from synapse.util.metrics import Measure -from unpaddedbase64 import decode_base64 - -import logging -import pymacaroons logger = logging.getLogger(__name__) @@ -566,8 +566,7 @@ class Auth(object): Args: request - An HTTP request with an access_token query parameter. Returns: - defer.Deferred: resolves to a namedtuple including "user" (UserID) - "access_token_id" (int), "is_guest" (bool) + defer.Deferred: resolves to a ``synapse.types.Requester`` object Raises: AuthError if no user by that token exists or the token is invalid. """ @@ -576,9 +575,7 @@ class Auth(object): user_id = yield self._get_appservice_user_id(request.args) if user_id: request.authenticated_entity = user_id - defer.returnValue( - Requester(UserID.from_string(user_id), "", False) - ) + defer.returnValue(synapse.types.create_requester(user_id)) access_token = request.args["access_token"][0] user_info = yield self.get_user_by_access_token(access_token, rights) @@ -612,7 +609,8 @@ class Auth(object): request.authenticated_entity = user.to_string() - defer.returnValue(Requester(user, token_id, is_guest)) + defer.returnValue(synapse.types.create_requester( + user, token_id, is_guest, device_id)) except KeyError: raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token.", diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6264aa0d9a..11081a0cd5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from synapse.api.errors import LimitExceededError +import synapse.types from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, Requester - - -import logging +from synapse.api.errors import LimitExceededError +from synapse.types import UserID logger = logging.getLogger(__name__) @@ -124,7 +124,8 @@ class BaseHandler(object): # and having homeservers have their own users leave keeps more # of that decision-making and control local to the guest-having # homeserver. - requester = Requester(target_user, "", True) + requester = synapse.types.create_requester( + target_user, is_guest=True) handler = self.hs.get_handlers().room_member_handler yield handler.update_membership( requester, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 711a6a567f..d9ac09078d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -13,15 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer +import synapse.types from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.types import UserID, Requester - +from synapse.types import UserID from ._base import BaseHandler -import logging - logger = logging.getLogger(__name__) @@ -165,7 +165,9 @@ class ProfileHandler(BaseHandler): try: # Assume the user isn't a guest because we don't let guests set # profile or avatar data. - requester = Requester(user, "", False) + # XXX why are we recreating `requester` here for each room? + # what was wrong with the `requester` we were passed? + requester = synapse.types.create_requester(user) yield handler.update_membership( requester, user, diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 94b19d0cb0..b9b5880d64 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -14,18 +14,19 @@ # limitations under the License. """Contains functions for registering clients.""" +import logging +import urllib + from twisted.internet import defer -from synapse.types import UserID, Requester +import synapse.types from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) -from ._base import BaseHandler -from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient - -import logging -import urllib +from synapse.types import UserID +from synapse.util.async import run_on_reactor +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -410,8 +411,9 @@ class RegistrationHandler(BaseHandler): if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) profile_handler = self.hs.get_handlers().profile_handler + requester = synapse.types.create_requester(user) yield profile_handler.set_displayname( - user, Requester(user, token, False), displayname + user, requester, displayname ) defer.returnValue((user_id, token)) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e616f44fd..8cec8fc4ed 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,24 +14,22 @@ # limitations under the License. -from twisted.internet import defer +import logging -from ._base import BaseHandler +from signedjson.key import decode_verify_key_bytes +from signedjson.sign import verify_signed_json +from twisted.internet import defer +from unpaddedbase64 import decode_base64 -from synapse.types import UserID, RoomID, Requester +import synapse.types from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room - -from signedjson.sign import verify_signed_json -from signedjson.key import decode_verify_key_bytes - -from unpaddedbase64 import decode_base64 - -import logging +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -315,7 +313,7 @@ class RoomMemberHandler(BaseHandler): ) assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) else: - requester = Requester(target_user, None, False) + requester = synapse.types.create_requester(target_user) message_handler = self.hs.get_handlers().message_handler prev_event = message_handler.deduplicate_state_event(event, context) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 89ab39491c..56364af337 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -13,18 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +import simplejson as json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID - -from canonicaljson import encode_canonical_json - from ._base import client_v2_patterns -import logging -import simplejson as json - logger = logging.getLogger(__name__) diff --git a/synapse/types.py b/synapse/types.py index f639651a73..5349b0c450 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,38 @@ from synapse.api.errors import SynapseError from collections import namedtuple -Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) +Requester = namedtuple("Requester", + ["user", "access_token_id", "is_guest", "device_id"]) +""" +Represents the user making a request + +Attributes: + user (UserID): id of the user making the request + access_token_id (int|None): *ID* of the access token used for this + request, or None if it came via the appservice API or similar + is_guest (bool): True if the user making this request is a guest user + device_id (str|None): device_id which was set at authentication time +""" + + +def create_requester(user_id, access_token_id=None, is_guest=False, + device_id=None): + """ + Create a new ``Requester`` object + + Args: + user_id (str|UserID): id of the user making the request + access_token_id (int|None): *ID* of the access token used for this + request, or None if it came via the appservice API or similar + is_guest (bool): True if the user making this request is a guest user + device_id (str|None): device_id which was set at authentication time + + Returns: + Requester + """ + if not isinstance(user_id, UserID): + user_id = UserID.from_string(user_id) + return Requester(user_id, access_token_id, is_guest, device_id) def get_domain_from_id(string): diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 4f2c14e4ff..f1f664275f 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -19,11 +19,12 @@ from twisted.internet import defer from mock import Mock, NonCallableMock +import synapse.types from synapse.api.errors import AuthError from synapse.handlers.profile import ProfileHandler from synapse.types import UserID -from tests.utils import setup_test_homeserver, requester_for_user +from tests.utils import setup_test_homeserver class ProfileHandlers(object): @@ -86,7 +87,7 @@ class ProfileTestCase(unittest.TestCase): def test_set_my_name(self): yield self.handler.set_displayname( self.frank, - requester_for_user(self.frank), + synapse.types.create_requester(self.frank), "Frank Jr." ) @@ -99,7 +100,7 @@ class ProfileTestCase(unittest.TestCase): def test_set_my_name_noauth(self): d = self.handler.set_displayname( self.frank, - requester_for_user(self.bob), + synapse.types.create_requester(self.bob), "Frank Jr." ) @@ -144,7 +145,8 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_set_my_avatar(self): yield self.handler.set_avatar_url( - self.frank, requester_for_user(self.frank), "http://my.server/pic.gif" + self.frank, synapse.types.create_requester(self.frank), + "http://my.server/pic.gif" ) self.assertEquals( diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 842e3d29d7..e70ac6f14d 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -13,15 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.replication.resource import ReplicationResource -from synapse.types import Requester, UserID +import contextlib +import json +from mock import Mock, NonCallableMock from twisted.internet import defer + +import synapse.types +from synapse.replication.resource import ReplicationResource +from synapse.types import UserID from tests import unittest -from tests.utils import setup_test_homeserver, requester_for_user -from mock import Mock, NonCallableMock -import json -import contextlib +from tests.utils import setup_test_homeserver class ReplicationResourceCase(unittest.TestCase): @@ -61,7 +63,7 @@ class ReplicationResourceCase(unittest.TestCase): def test_events_and_state(self): get = self.get(events="-1", state="-1", timeout="0") yield self.hs.get_handlers().room_creation_handler.create_room( - Requester(self.user, "", False), {} + synapse.types.create_requester(self.user), {} ) code, body = yield get self.assertEquals(code, 200) @@ -144,7 +146,7 @@ class ReplicationResourceCase(unittest.TestCase): def send_text_message(self, room_id, message): handler = self.hs.get_handlers().message_handler event = yield handler.create_and_send_nonmember_event( - requester_for_user(self.user), + synapse.types.create_requester(self.user), { "type": "m.room.message", "content": {"body": "message", "msgtype": "m.text"}, @@ -157,7 +159,7 @@ class ReplicationResourceCase(unittest.TestCase): @defer.inlineCallbacks def create_room(self): result = yield self.hs.get_handlers().room_creation_handler.create_room( - Requester(self.user, "", False), {} + synapse.types.create_requester(self.user), {} ) defer.returnValue(result["room_id"]) diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index af02fce8fb..1e95e97538 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -14,17 +14,14 @@ # limitations under the License. """Tests REST events for /profile paths.""" -from tests import unittest -from twisted.internet import defer - from mock import Mock +from twisted.internet import defer -from ....utils import MockHttpResource, setup_test_homeserver - +import synapse.types from synapse.api.errors import SynapseError, AuthError -from synapse.types import Requester, UserID - from synapse.rest.client.v1 import profile +from tests import unittest +from ....utils import MockHttpResource, setup_test_homeserver myid = "@1234ABCD:test" PATH_PREFIX = "/_matrix/client/api/v1" @@ -52,7 +49,7 @@ class ProfileTestCase(unittest.TestCase): ) def _get_user_by_req(request=None, allow_guest=False): - return Requester(UserID.from_string(myid), "", False) + return synapse.types.create_requester(myid) hs.get_v1auth().get_user_by_req = _get_user_by_req diff --git a/tests/utils.py b/tests/utils.py index ed547bc39b..915b934e94 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,7 +20,6 @@ from synapse.storage.prepare_database import prepare_database from synapse.storage.engines import create_engine from synapse.server import HomeServer from synapse.federation.transport import server -from synapse.types import Requester from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.logcontext import LoggingContext @@ -512,7 +511,3 @@ class DeferredMockCallable(object): "call(%s)" % _format_call(c[0], c[1]) for c in calls ]) ) - - -def requester_for_user(user): - return Requester(user, None, False) -- cgit 1.4.1 From 87ffd21b291a503fd47ba938b32658c9f475aed5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 19:19:08 +0100 Subject: Fix a couple of bugs in the transaction and keyring code --- synapse/crypto/keyring.py | 17 +++++++++-------- synapse/storage/transactions.py | 3 ++- 2 files changed, 11 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d08ee0aa91..826845f695 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -275,14 +275,15 @@ class Keyring(object): for server_name, groups in missing_groups.items() } - for group in missing_groups.values(): - group_id_to_deferred[group.group_id].errback(SynapseError( - 401, - "No key for %s with id %s" % ( - group.server_name, group.key_ids, - ), - Codes.UNAUTHORIZED, - )) + for groups in missing_groups.values(): + for group in groups: + group_id_to_deferred[group.group_id].errback(SynapseError( + 401, + "No key for %s with id %s" % ( + group.server_name, group.key_ids, + ), + Codes.UNAUTHORIZED, + )) def on_err(err): for deferred in group_id_to_deferred.values(): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6c7481a728..6258ff1725 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -24,6 +24,7 @@ from collections import namedtuple import itertools import logging +import ujson as json logger = logging.getLogger(__name__) @@ -101,7 +102,7 @@ class TransactionStore(SQLBaseStore): ) if result and result["response_code"]: - return result["response_code"], result["response_json"] + return result["response_code"], json.loads(str(result["response_json"])) else: return None -- cgit 1.4.1 From a4b06b619c81f4a212323cc02565c7c893d5c2e5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 19:50:11 +0100 Subject: Add a couple more checks to the keyring --- synapse/crypto/keyring.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d08ee0aa91..627bd0d222 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -447,7 +447,7 @@ class Keyring(object): ) processed_response = yield self.process_v2_response( - perspective_name, response + perspective_name, response, only_from_server=False ) for server_name, response_keys in processed_response.items(): @@ -527,7 +527,7 @@ class Keyring(object): @defer.inlineCallbacks def process_v2_response(self, from_server, response_json, - requested_ids=[]): + requested_ids=[], only_from_server=True): time_now_ms = self.clock.time_msec() response_keys = {} verify_keys = {} @@ -551,6 +551,13 @@ class Keyring(object): results = {} server_name = response_json["server_name"] + if only_from_server: + if server_name != from_server: + raise ValueError( + "Expected a response for server %r not %r" % ( + from_server, server_name + ) + ) for key_id in response_json["signatures"].get(server_name, {}): if key_id not in response_json["verify_keys"]: raise ValueError( -- cgit 1.4.1 From 2e3d90d67c8255300b226d6d2fdc2acef80e58ba Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 23:38:12 +0100 Subject: Make the device id on e2e key upload optional We should now be able to get our device_id from the access_token, so the device_id on the upload request is optional. Where it is supplied, we should check that it matches. For active access_tokens without an associated device_id, we ought to register the device in the devices table. Also update the table on upgrade so that all of the existing e2e keys are associated with real devices. --- synapse/rest/client/v2_alpha/keys.py | 47 ++++++++++++++++------ .../schema/delta/33/devices_for_e2e_keys.sql | 19 +++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) create mode 100644 synapse/storage/schema/delta/33/devices_for_e2e_keys.sql (limited to 'synapse') diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 56364af337..0bf32a089b 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -19,6 +19,9 @@ import simplejson as json from canonicaljson import encode_canonical_json from twisted.internet import defer +import synapse.api.errors +import synapse.server +import synapse.types from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID from ._base import client_v2_patterns @@ -28,7 +31,7 @@ logger = logging.getLogger(__name__) class KeyUploadServlet(RestServlet): """ - POST /keys/upload/ HTTP/1.1 + POST /keys/upload HTTP/1.1 Content-Type: application/json { @@ -51,23 +54,51 @@ class KeyUploadServlet(RestServlet): }, } """ - PATTERNS = client_v2_patterns("/keys/upload/(?P[^/]*)", releases=()) + PATTERNS = client_v2_patterns("/keys/upload(/(?P[^/]+))?$", + releases=(), v2_alpha=False) def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ super(KeyUploadServlet, self).__init__() self.store = hs.get_datastore() self.clock = hs.get_clock() self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() - # TODO: Check that the device_id matches that in the authentication - # or derive the device_id from the authentication instead. body = parse_json_object_from_request(request) + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. But if a device_id + # was given here and in the auth, they must match. + + if (requester.device_id is not None and + device_id != requester.device_id): + raise synapse.api.errors.SynapseError( + 400, "Can only upload keys for current device" + ) + + self.device_handler.check_device_registered( + user_id, device_id, "unknown device" + ) + else: + device_id = requester.device_id + + if device_id is None: + raise synapse.api.errors.SynapseError( + 400, + "To upload keys, you must pass device_id when authenticating" + ) + time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. @@ -103,14 +134,6 @@ class KeyUploadServlet(RestServlet): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue((200, {"one_time_key_counts": result})) - @defer.inlineCallbacks - def on_GET(self, request, device_id): - requester = yield self.auth.get_user_by_req(request) - user_id = requester.user.to_string() - - result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - defer.returnValue((200, {"one_time_key_counts": result})) - class KeyQueryServlet(RestServlet): """ diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql new file mode 100644 index 0000000000..2908c4d232 --- /dev/null +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -0,0 +1,19 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- make sure that we have a device record for each set of E2E keys, so that the +-- user can delete them if they like. +INSERT INTO devices + SELECT user_id, device_id, "unknown device" FROM e2e_device_keys_json; -- cgit 1.4.1 From d47115ff8bf3ab5952f053db578a519e8e3f930c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Jul 2016 12:18:03 +0100 Subject: Delete e2e keys on device delete --- synapse/handlers/device.py | 4 ++++ synapse/rest/client/v2_alpha/keys.py | 13 +++++++++---- synapse/storage/end_to_end_keys.py | 15 +++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index eaead50800..f4bf159bb5 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -143,6 +143,10 @@ class DeviceHandler(BaseHandler): delete_refresh_tokens=True, ) + yield self.store.delete_e2e_keys_by_device( + user_id=user_id, device_id=device_id + ) + @defer.inlineCallbacks def update_device(self, user_id, device_id, content): """ Update the given device diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 0bf32a089b..4629f4bfde 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -86,10 +86,6 @@ class KeyUploadServlet(RestServlet): raise synapse.api.errors.SynapseError( 400, "Can only upload keys for current device" ) - - self.device_handler.check_device_registered( - user_id, device_id, "unknown device" - ) else: device_id = requester.device_id @@ -131,6 +127,15 @@ class KeyUploadServlet(RestServlet): user_id, device_id, time_now, key_list ) + # the device should have been registered already, but it may have been + # deleted due to a race with a DELETE request. Or we may be using an + # old access_token without an associated device_id. Either way, we + # need to double-check the device is registered to avoid ending up with + # keys without a corresponding device. + self.device_handler.check_device_registered( + user_id, device_id, "unknown device" + ) + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue((200, {"one_time_key_counts": result})) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 2e89066515..62b7790e91 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import twisted.internet.defer + from ._base import SQLBaseStore @@ -123,3 +125,16 @@ class EndToEndKeyStore(SQLBaseStore): return self.runInteraction( "claim_e2e_one_time_keys", _claim_e2e_one_time_keys ) + + @twisted.internet.defer.inlineCallbacks + def delete_e2e_keys_by_device(self, user_id, device_id): + yield self._simple_delete( + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_e2e_device_keys_by_device" + ) + yield self._simple_delete( + table="e2e_one_time_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_e2e_one_time_keys_by_device" + ) -- cgit 1.4.1 From 26cb0efa88c2fa84089c74e3de02fa2ce832f47a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Jul 2016 12:30:22 +0100 Subject: SQL syntax fix --- synapse/storage/schema/delta/33/devices_for_e2e_keys.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql index 2908c4d232..140f2b63e0 100644 --- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -16,4 +16,4 @@ -- make sure that we have a device record for each set of E2E keys, so that the -- user can delete them if they like. INSERT INTO devices - SELECT user_id, device_id, "unknown device" FROM e2e_device_keys_json; + SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json; -- cgit 1.4.1 From fe1b36994643ed57b511d9caf834e3e131cd404c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 27 Jul 2016 14:10:43 +0100 Subject: Clean up verify_json_objects_for_server --- synapse/crypto/keyring.py | 143 ++++++++++++++++++++++++---------------------- 1 file changed, 75 insertions(+), 68 deletions(-) (limited to 'synapse') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d08ee0aa91..f3924e23d8 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -44,7 +44,21 @@ import logging logger = logging.getLogger(__name__) -KeyGroup = namedtuple("KeyGroup", ("server_name", "group_id", "key_ids")) +VerifyKeyRequest = namedtuple("VerifyRequest", ( + "server_name", "key_ids", "json_object", "deferred" +)) +""" +A request for a verify key to verify a JSON object. + +Attributes: + server_name(str): The name of the server to verify against. + key_ids(set(str)): The set of key_ids to that could be used to verify the + JSON object + json_object(dict): The JSON object to verify. + deferred(twisted.internet.defer.Deferred): + A deferred (server_name, key_id, verify_key) tuple that resolves when + a verify key has been fetched +""" class Keyring(object): @@ -74,39 +88,32 @@ class Keyring(object): list of deferreds indicating success or failure to verify each json object's signature for the given server_name. """ - group_id_to_json = {} - group_id_to_group = {} - group_ids = [] - - next_group_id = 0 - deferreds = {} + verify_requests = [] for server_name, json_object in server_and_json: logger.debug("Verifying for %s", server_name) - group_id = next_group_id - next_group_id += 1 - group_ids.append(group_id) key_ids = signature_ids(json_object, server_name) if not key_ids: - deferreds[group_id] = defer.fail(SynapseError( + deferred = defer.fail(SynapseError( 400, "Not signed with a supported algorithm", Codes.UNAUTHORIZED, )) else: - deferreds[group_id] = defer.Deferred() + deferred = defer.Deferred() - group = KeyGroup(server_name, group_id, key_ids) + verify_request = VerifyKeyRequest( + server_name, key_ids, json_object, deferred + ) - group_id_to_group[group_id] = group - group_id_to_json[group_id] = json_object + verify_requests.append(verify_request) @defer.inlineCallbacks - def handle_key_deferred(group, deferred): - server_name = group.server_name + def handle_key_deferred(verify_request): + server_name = verify_request.server_name try: - _, _, key_id, verify_key = yield deferred + _, key_id, verify_key = yield verify_request.deferred except IOError as e: logger.warn( "Got IOError when downloading keys for %s: %s %s", @@ -128,7 +135,7 @@ class Keyring(object): Codes.UNAUTHORIZED, ) - json_object = group_id_to_json[group.group_id] + json_object = verify_request.json_object try: verify_signed_json(json_object, server_name, verify_key) @@ -157,36 +164,34 @@ class Keyring(object): # Actually start fetching keys. wait_on_deferred.addBoth( - lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) + lambda _: self.get_server_verify_keys(verify_requests) ) # When we've finished fetching all the keys for a given server_name, # resolve the deferred passed to `wait_for_previous_lookups` so that # any lookups waiting will proceed. - server_to_gids = {} + server_to_request_ids = {} - def remove_deferreds(res, server_name, group_id): - server_to_gids[server_name].discard(group_id) - if not server_to_gids[server_name]: + def remove_deferreds(res, server_name, verify_request): + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: d = server_to_deferred.pop(server_name, None) if d: d.callback(None) return res - for g_id, deferred in deferreds.items(): - server_name = group_id_to_group[g_id].server_name - server_to_gids.setdefault(server_name, set()).add(g_id) - deferred.addBoth(remove_deferreds, server_name, g_id) + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + deferred.addBoth(remove_deferreds, server_name, verify_request) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified return [ - preserve_context_over_fn( - handle_key_deferred, - group_id_to_group[g_id], - deferreds[g_id], - ) - for g_id in group_ids + preserve_context_over_fn(handle_key_deferred, verify_request) + for verify_request in verify_requests ] @defer.inlineCallbacks @@ -220,7 +225,7 @@ class Keyring(object): d.addBoth(rm, server_name) - def get_server_verify_keys(self, group_id_to_group, group_id_to_deferred): + def get_server_verify_keys(self, verify_requests): """Takes a dict of KeyGroups and tries to find at least one key for each group. """ @@ -237,62 +242,64 @@ class Keyring(object): merged_results = {} missing_keys = {} - for group in group_id_to_group.values(): - missing_keys.setdefault(group.server_name, set()).update( - group.key_ids + for verify_request in verify_requests: + missing_keys.setdefault(verify_request.server_name, set()).update( + verify_request.key_ids ) for fn in key_fetch_fns: results = yield fn(missing_keys.items()) merged_results.update(results) - # We now need to figure out which groups we have keys for - # and which we don't - missing_groups = {} - for group in group_id_to_group.values(): - for key_id in group.key_ids: - if key_id in merged_results[group.server_name]: + # We now need to figure out which verify requests we have keys + # for and which we don't + missing_keys = {} + requests_missing_keys = [] + for verify_request in verify_requests: + server_name = verify_request.server_name + result_keys = merged_results[server_name] + + if verify_request.deferred.called: + # We've already called this deferred, which probably + # means that we've already found a key for it. + continue + + for key_id in verify_request.key_ids: + if key_id in result_keys: with PreserveLoggingContext(): - group_id_to_deferred[group.group_id].callback(( - group.group_id, - group.server_name, + verify_request.deferred.callback(( + server_name, key_id, - merged_results[group.server_name][key_id], + result_keys[key_id], )) break else: - missing_groups.setdefault( - group.server_name, [] - ).append(group) - - if not missing_groups: + # The else block is only reached if the loop above + # doesn't break. + missing_keys.setdefault(server_name, set()).update( + verify_request.key_ids + ) + requests_missing_keys.append(verify_request) + + if not missing_keys: break - missing_keys = { - server_name: set( - key_id for group in groups for key_id in group.key_ids - ) - for server_name, groups in missing_groups.items() - } - - for group in missing_groups.values(): - group_id_to_deferred[group.group_id].errback(SynapseError( + for verify_request in requests_missing_keys.values(): + verify_request.deferred.errback(SynapseError( 401, "No key for %s with id %s" % ( - group.server_name, group.key_ids, + verify_request.server_name, verify_request.key_ids, ), Codes.UNAUTHORIZED, )) def on_err(err): - for deferred in group_id_to_deferred.values(): - if not deferred.called: - deferred.errback(err) + for verify_request in verify_requests: + if not verify_request.deferred.called: + verify_request.deferred.errback(err) do_iterations().addErrback(on_err) - return group_id_to_deferred - @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): res = yield defer.gatherResults( -- cgit 1.4.1 From ccec25e2c6270c1cae916b8ca8a775a166ea7e7f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Jul 2016 16:41:06 +0100 Subject: key upload tweaks 1. Add v2_alpha URL back in, since things seem to be using it. 2. Don't reject the request if the device_id in the upload request fails to match that in the access_token. --- synapse/rest/client/v2_alpha/keys.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 4629f4bfde..dc1d4d8fc6 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -55,7 +55,7 @@ class KeyUploadServlet(RestServlet): } """ PATTERNS = client_v2_patterns("/keys/upload(/(?P[^/]+))?$", - releases=(), v2_alpha=False) + releases=()) def __init__(self, hs): """ @@ -78,14 +78,12 @@ class KeyUploadServlet(RestServlet): if device_id is not None: # passing the device_id here is deprecated; however, we allow it - # for now for compatibility with older clients. But if a device_id - # was given here and in the auth, they must match. - + # for now for compatibility with older clients. if (requester.device_id is not None and device_id != requester.device_id): - raise synapse.api.errors.SynapseError( - 400, "Can only upload keys for current device" - ) + logger.warning("Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, device_id) else: device_id = requester.device_id -- cgit 1.4.1 From 5238960850b4aa4b318f7c794fdadaf12dfe3841 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jul 2016 17:33:09 +0100 Subject: Bump CHANGES and version --- CHANGES.rst | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 57 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/CHANGES.rst b/CHANGES.rst index e1d5e876dc..799c14575c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,59 @@ +Changes in synapse v0.17.0-r1 (2016-07-27) +========================================== + +This release changes the LDAP configuration format in a backwards incompatible +way, see PR #843 for details. + + +Features: + +* Add purge_media_cache admin API (PR #902) +* Add deactivate account admin API (PR #903) +* Add optional pepper to password hashing (PR #907, #910) +* Add an admin option to shared secret registration (breaks backwards compat) + (PR #909) +* Add purge local room history API (PR #911, #923, #924) +* Add requestToken endpoints (PR #915) +* Add an /account/deactivate endpoint (PR #921) +* Add filter param to /messages. Add 'contains_url' to filter. (PR #922) +* Add device_id support to /login (PR #929) +* Add device_id support to /v2/register flow. (PR #937, #942) +* Add GET /devices endpoint (PR #939, #944) +* Add GET /device/{deviceId} (PR #943) +* Add update and delete APIs for devices (PR #949) + + +Changes: + +* Rewrite LDAP Authentication against ldap3 (PR #843) +* Linearize some federation endpoints based on (origin, room_id) (PR #879) +* Remove the legacy v0 content upload API. (PR #888) +* Use similar naming we use in email notifs for push (PR #894) +* Optionally include password hash in createUser endpoint (PR #905) +* Use a query that postgresql optimises better for get_events_around (PR #906) +* Fall back to 'username' if 'user' is not given for appservice registration. + (PR #927) +* Add metrics for psutil derived memory usage (PR #936) +* Record device_id in client_ips (PR #938) +* Log the hostname the reCAPTCHA was completed on (PR #946) + + +Bug fixes: + +* Fix substitution failure in mail template (PR #887) +* Put most recent 20 messages in email notif (PR #892) +* Ensure that the guest user is in the database when upgrading accounts + (PR #914) +* Fix various edge cases in auth handling (PR #919) +* Fix 500 ISE when sending alias event without a state_key (PR #925) +* Fix bug where we stored rejections in the state_group, persist all + rejections (PR #948) +* Fix lack of check of if the user is banned when handling 3pid invites + (PR #952) +* Fix a couple of bugs in the transaction and keyring code (PR #954, #955) + + + Changes in synapse v0.16.1-r1 (2016-07-08) ========================================== diff --git a/synapse/__init__.py b/synapse/__init__.py index 2750ad3f7a..b0bd7254c5 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.16.1-r1" +__version__ = "0.16.17" -- cgit 1.4.1 From fda078f995265adb0ecee5734c516eb55adc9355 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 28 Jul 2016 09:14:21 +0100 Subject: Add r0.2.0 to the "supported versions" list --- synapse/rest/client/versions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index ca5468c402..1fe31abb42 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -26,7 +26,10 @@ class VersionsRestServlet(RestServlet): def on_GET(self, request): return (200, { - "versions": ["r0.0.1"] + "versions": [ + "r0.0.1", + "r0.2.0", + ] }) -- cgit 1.4.1 From ecd5e6bfa4b84b6beb47b27d476f0bdba66f7a23 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2016 10:04:37 +0100 Subject: Typo --- synapse/push/push_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 6f2d1ad57d..d555a33e9a 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -54,7 +54,7 @@ def get_context_for_event(state_handler, ev, user_id): room_state = yield state_handler.get_current_state(ev.room_id) # we no longer bother setting room_alias, and make room_name the - # human-readable name instead, be that m.room.namer, an alias or + # human-readable name instead, be that m.room.name, an alias or # a list of people in the room name = calculate_room_name( room_state, user_id, fallback_to_single_member=False -- cgit 1.4.1 From f6f8f81a4800cae83684cd1d75eb9a132c5bde6e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 28 Jul 2016 10:14:07 +0100 Subject: Add r0.1.0 to the "supported versions" list --- synapse/rest/client/versions.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 1fe31abb42..e984ea47db 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -28,6 +28,7 @@ class VersionsRestServlet(RestServlet): return (200, { "versions": [ "r0.0.1", + "r0.1.0", "r0.2.0", ] }) -- cgit 1.4.1 From 389c890f14c456a157d973fd29b49d64e5fa9226 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2016 10:20:47 +0100 Subject: Don't include name of room for invites in push Avoids insane pushes like, "Bob invited you to invite from Bob" --- synapse/util/presentable_names.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index 4c54812e6f..f68676e9e7 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -83,7 +83,10 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True, ): if ("m.room.member", my_member_event.sender) in room_state: inviter_member_event = room_state[("m.room.member", my_member_event.sender)] - return "Invite from %s" % (name_from_member_event(inviter_member_event),) + if fallback_to_single_member: + return "Invite from %s" % (name_from_member_event(inviter_member_event),) + else: + return None else: return "Room Invite" -- cgit 1.4.1 From 7871790db1b38d10783d88ebfc9bd4e0356195c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 10:38:56 +0100 Subject: Bump version and changelog --- CHANGES.rst | 7 +++++-- synapse/__init__.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/CHANGES.rst b/CHANGES.rst index 799c14575c..65566adda1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,5 +1,5 @@ -Changes in synapse v0.17.0-r1 (2016-07-27) -========================================== +Changes in synapse v0.17.0-rc1 (2016-07-28) +=========================================== This release changes the LDAP configuration format in a backwards incompatible way, see PR #843 for details. @@ -36,6 +36,9 @@ Changes: * Add metrics for psutil derived memory usage (PR #936) * Record device_id in client_ips (PR #938) * Log the hostname the reCAPTCHA was completed on (PR #946) +* Make the device id on e2e key upload optional (PR #956) +* Add r0.2.0 to the "supported versions" list (PR #960) +* Don't include name of room for invites in push (PR #961) Bug fixes: diff --git a/synapse/__init__.py b/synapse/__init__.py index b0bd7254c5..8f0176e182 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.16.17" +__version__ = "0.17.0-rc1" -- cgit 1.4.1 From bf81e38d365b79130b5e04053de0eaff94b0d472 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Jul 2016 10:29:12 +0100 Subject: Fix retry utils to check if the exception is a subclass of CME --- synapse/util/retryutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 43cf11f3f6..49527f4d21 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -128,7 +128,7 @@ class RetryDestinationLimiter(object): ) valid_err_code = False - if exc_type is CodeMessageException: + if exc_type is not None and issubclass(exc_type, CodeMessageException): valid_err_code = 0 <= exc_val.code < 500 if exc_type is None or valid_err_code: -- cgit 1.4.1 From 370135ad0b7cf7ded04e9f2ca0c99f5470f5efc1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 28 Jul 2016 16:47:37 +0100 Subject: Comment get_unread_push_actions_for_user_in_range function --- synapse/storage/event_push_actions.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3d93285f84..958dbcc22b 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -119,9 +119,28 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None, + max_stream_ordering, limit=20): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. + + Args: + user_id (str) + min_stream_ordering + max_stream_ordering + limit (int) + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions", "received_ts". + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the most recent + # push actions def get_after_receipt(txn): + # XXX: Do we really need to GROUP BY user_id on the inner SELECT? + # XXX: NATURAL JOIN obfuscates which columns are being joined on the + # inner SELECT (the room_id and event_id), can we + # INNER JOIN ... USING instead? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " "e.received_ts " @@ -160,7 +179,12 @@ class EventPushActionsStore(SQLBaseStore): "get_unread_push_actions_for_user_in_range", get_after_receipt ) + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. def get_no_receipt(txn): + # XXX: Does the inner SELECT really need to select from the events table? + # We're just extracting the room_id, so isn't receipts_linearized enough? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" @@ -198,7 +222,7 @@ class EventPushActionsStore(SQLBaseStore): # Now sort it so it's ordered correctly, since currently it will # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered - # by received_ts + # by received_ts (most recent first) notifs.sort(key=lambda r: -(r['received_ts'] or 0)) # Now return the first `limit` -- cgit 1.4.1 From 0a7d3cd00f8b7e3ad0ba458c3ab9b40a2496545b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Jul 2016 20:24:24 +0100 Subject: Create separate methods for getting messages to push for the email and http pushers rather than trying to make a single method that will work with their conflicting requirements. The http pusher needs to get the messages in ascending stream order, and doesn't want to miss a message. The email pusher needs to get the messages in descending timestamp order, and doesn't mind if it misses messages. --- synapse/push/emailpusher.py | 5 +- synapse/push/httppusher.py | 3 +- synapse/replication/slave/storage/events.py | 7 +- synapse/storage/event_push_actions.py | 199 +++++++++++++++++++++------- tests/storage/test_event_push_actions.py | 41 ++++++ 5 files changed, 204 insertions(+), 51 deletions(-) create mode 100644 tests/storage/test_event_push_actions.py (limited to 'synapse') diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 12a3ec7fd8..e224b68291 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -140,9 +140,8 @@ class EmailPusher(object): being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( - self.user_id, start, self.max_stream_ordering - ) + fn = self.store.get_unread_push_actions_for_user_in_range_for_email + unprocessed = yield fn(self.user_id, start, self.max_stream_ordering) soonest_due_at = None diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2acc6cc214..9a7db61220 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -141,7 +141,8 @@ class HttpPusher(object): run once per pusher. """ - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + fn = self.store.get_unread_push_actions_for_user_in_range_for_http + unprocessed = yield fn( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 369d839464..6a644f1386 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore): StreamStore.__dict__["get_recent_event_ids_for_room"] ) - get_unread_push_actions_for_user_in_range = ( - DataStore.get_unread_push_actions_for_user_in_range.__func__ + get_unread_push_actions_for_user_in_range_for_http = ( + DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ + ) + get_unread_push_actions_for_user_in_range_for_email = ( + DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__ ) get_push_action_users_in_range = ( DataStore.get_push_action_users_in_range.__func__ diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 958dbcc22b..5ab362bef2 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -117,40 +117,149 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def get_unread_push_actions_for_user_in_range(self, user_id, - min_stream_ordering, - max_stream_ordering, - limit=20): + def get_unread_push_actions_for_user_in_range_for_http( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): """Get a list of the most recent unread push actions for a given user, - within the given stream ordering range. + within the given stream ordering range. Called by the httppusher. Args: - user_id (str) - min_stream_ordering - max_stream_ordering - limit (int) + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions". + The list will be ordered by ascending stream_ordering. + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the next + # push actions + def get_after_receipt(txn): + # find rooms that have a read receipt in them and return the next + # push actions + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + ") AS rl," + " event_push_actions AS ep" + " WHERE" + " ep.room_id = rl.room_id" + " AND (" + " ep.topological_ordering > rl.topological_ordering" + " OR (" + " ep.topological_ordering = rl.topological_ordering" + " AND ep.stream_ordering > rl.stream_ordering" + " )" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + after_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt + ) + + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. + def get_no_receipt(txn): + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + no_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt + ) + + notifs = [ + { + "event_id": row[0], + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + } for row in after_read_receipt + no_read_receipt + ] + + # Now sort it so it's ordered correctly, since currently it will + # contain results from the first query, correctly ordered, followed + # by results from the second query, but we want them all ordered + # by stream_ordering, oldest first. + notifs.sort(key=lambda r: r['stream_ordering']) + + # Take only up to the limit. We have to stop at the limit because + # one of the subqueries may have hit the limit. + defer.returnValue(notifs[:limit]) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range_for_email( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. Called by the emailpusher + + Args: + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. Returns: A promise which resolves to a list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions", "received_ts". + The list will be ordered by descending received_ts. The list will have between 0~limit entries. """ # find rooms that have a read receipt in them and return the most recent # push actions def get_after_receipt(txn): - # XXX: Do we really need to GROUP BY user_id on the inner SELECT? - # XXX: NATURAL JOIN obfuscates which columns are being joined on the - # inner SELECT (the room_id and event_id), can we - # INNER JOIN ... USING instead? sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " - "e.received_ts " - "FROM (" - " SELECT room_id, user_id, " - " max(topological_ordering) as topological_ordering, " - " max(stream_ordering) as stream_ordering " - " FROM events" - " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" - " GROUP BY room_id, user_id" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" ") AS rl," " event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" @@ -165,47 +274,47 @@ class EventPushActionsStore(SQLBaseStore): " )" " AND ep.stream_ordering > ?" " AND ep.user_id = ?" - " AND ep.user_id = rl.user_id" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [min_stream_ordering, user_id] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_after_receipt + "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt ) # There are rooms with push actions in them but you don't have a read receipt in # them e.g. rooms you've been invited to, so get push actions for rooms which do # not have read receipts in them too. def get_no_receipt(txn): - # XXX: Does the inner SELECT really need to select from the events table? - # We're just extracting the room_id, so isn't receipts_linearized enough? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" " FROM event_push_actions AS ep" - " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " WHERE ep.room_id not in (" - " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ?" - " GROUP BY room_id" - ") AND ep.user_id = ? AND ep.stream_ordering > ?" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [user_id, user_id, min_stream_ordering] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() no_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_no_receipt + "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt ) # Make a list of dicts from the two sets of results. diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py new file mode 100644 index 0000000000..e9044afa2e --- /dev/null +++ b/tests/storage/test_event_push_actions.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import tests.unittest +import tests.utils + +USER_ID = "@user:example.com" + + +class EventPushActionsStoreTestCase(tests.unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_http(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_http( + USER_ID, 0, 1000, 20 + ) + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_email(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_email( + USER_ID, 0, 1000, 20 + ) -- cgit 1.4.1 From 8dad08a9509103f38d9eec5dc28d46e4a757fad8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 29 Jul 2016 09:57:13 +0100 Subject: Fix SQL to supply arguments in the same order --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5ab362bef2..df4000d0da 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -272,8 +272,8 @@ class EventPushActionsStore(SQLBaseStore): " AND ep.stream_ordering > rl.stream_ordering" " )" " )" - " AND ep.stream_ordering > ?" " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) -- cgit 1.4.1