From be09c23ff02bee9c63611df528a269fb157f2f3c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 15:40:07 +0000 Subject: Add txn_id kwarg to push methods --- synapse/appservice/api.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c2179f8d55..c17fb219c5 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -72,11 +72,16 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push_bulk(self, service, events): + def push_bulk(self, service, events, txn_id=None): events = self._serialize(events) + if txn_id is None: + logger.warning("push_bulk: Missing txn ID sending events to %s", + service.url) + txn_id = str(0) + uri = service.url + ("/transactions/%s" % - urllib.quote(str(0))) # TODO txn_ids + urllib.quote(txn_id)) response = None try: response = yield self.put_json( @@ -97,8 +102,8 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push(self, service, event): - response = yield self.push_bulk(service, [event]) + def push(self, service, event, txn_id=None): + response = yield self.push_bulk(service, [event], txn_id) defer.returnValue(response) def _serialize(self, events): -- cgit 1.5.1 From e3190711911f166c5599acb66929f222498b212a Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 16:30:33 +0000 Subject: Add stub scheduler module for txn reliability --- synapse/appservice/scheduler.py | 68 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 synapse/appservice/scheduler.py (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py new file mode 100644 index 0000000000..a5060808d3 --- /dev/null +++ b/synapse/appservice/scheduler.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +This module controls the reliability for application service transactions. + +The nominal flow through this module looks like: + ___________ + \O/ --- event -->| | +--------------+ + | - event ---->| EventPool |<-- poll 1/s for events ---| EventSorter | + / \ ---- event ->|___________| +--------------+ + USERS ____________________________| + | | | + V V V + ASa ASb ASc + [e,e] [e] [e,e,e] + | + V + -````````- +------------+ + |````````|<--StoreTxn-|Transaction | + |Database| | Maker |---> SEND TO AS + `--------` +------------+ +What happens on SEND TO AS depends on the state of the Application Service: + - If the AS is marked as DOWN, do nothing. + - If the AS is marked as UP, send the transaction. + * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn + contents from the db. + * FAILURE : Marked AS as DOWN and start Recoverer. + +Recoverer attempts to recover ASes who have died. The flow for this looks like: + ,--------------------- backoff++ --------------. + V | + START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE + backoff DB and try to send it + ^ |__________ +Mark AS as | V +UP & quit +---------- YES SUCCESS + | | | + NO <--- Have more txns? <------ Mark txn success & nuke -+ + from db; incr AS pos. +""" + + +class EventPool(object): + pass + + +class EventSorter(object): + pass + + +class TransactionMaker(object): + pass + + +class Recoverer(object): + pass -- cgit 1.5.1 From 773cb3b6880851f318147b59f16c2c882d280a6e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 17:35:07 +0000 Subject: Add stub architecture for txn reliability. --- synapse/appservice/scheduler.py | 121 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 10 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index a5060808d3..3162fbec11 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -18,7 +18,7 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: ___________ \O/ --- event -->| | +--------------+ - | - event ---->| EventPool |<-- poll 1/s for events ---| EventSorter | + | - event ---->| event_pool|<-- poll 1/s for events ---| EventSorter | / \ ---- event ->|___________| +--------------+ USERS ____________________________| | | | @@ -29,7 +29,7 @@ The nominal flow through this module looks like: V -````````- +------------+ |````````|<--StoreTxn-|Transaction | - |Database| | Maker |---> SEND TO AS + |Database| | Controller |---> SEND TO AS `--------` +------------+ What happens on SEND TO AS depends on the state of the Application Service: - If the AS is marked as DOWN, do nothing. @@ -49,20 +49,121 @@ UP & quit +---------- YES SUCCESS | | | NO <--- Have more txns? <------ Mark txn success & nuke -+ from db; incr AS pos. + +This is all tied together by the AppServiceScheduler which DIs the required +components. """ -class EventPool(object): - pass +class AppServiceScheduler(object): + """ Public facing API for this module. Does the required DI to tie the + components together. This also serves as the "event_pool", which in this + case is a simple array. + """ + + def __init__(self, store, as_api, services): + self.app_services = services + self.event_pool = [] + + def create_recoverer(service): + return _Recoverer(store, as_api, service) + self.txn_ctrl = _TransactionController(store, as_api, create_recoverer) + + self.event_sorter = _EventSorter(self, self.txn_ctrl, services) + + def start(self): + self.event_sorter.start_polling() + + def store_event(self, event): # event_pool + self.event_pool.append(event) + + def get_events(self): # event_pool + return self.event_pool + + +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + # sends this transaction using this as_api + pass + + def complete(self, store): + # increment txn id on AS and nuke txn contents from db + pass + + +class _EventSorter(object): + + def __init__(self, event_pool, txn_ctrl, services): + self.event_pool = event_pool + self.txn_ctrl = txn_ctrl + self.services = services + + def start_polling(self): + events = self.event_pool.get_events() + if events: + self._process(events) + # repoll later on + + def _process(self, events): + # sort events + # f.e. (AS, events) => poke transaction controller + pass + + +class _TransactionController(object): + + def __init__(self, store, as_api, recoverer_fn): + self.store = store + self.as_api = as_api + self.recoverer_fn = recoverer_fn + + def on_receive_events(self, service, events): + txn = self._store_txn(service, events) + if txn.send(self.as_api): + txn.complete(self.store) + else: + self._start_recoverer(service) + + def _start_recoverer(self, service): + recoverer = self.recoverer_fn(service) + recoverer.recover() + + def _store_txn(self, service, events): + pass # returns AppServiceTransaction + + +class _Recoverer(object): + def __init__(self, store, as_api, service): + self.store = store + self.as_api = as_api + self.service = service + self.backoff_counter = 1 -class EventSorter(object): - pass + def recover(self): + # TODO wait a bit + txn = self._get_oldest_txn() + if txn: + if txn.send(self.as_api): + txn.complete(self.store) + else: + self.backoff_counter += 1 + self.recover(self.service) + return + else: + self._set_service_recovered(self.service) + def _set_service_recovered(self, service): + pass -class TransactionMaker(object): - pass + def _get_oldest_txn(self): + pass # returns AppServiceTransaction -class Recoverer(object): - pass -- cgit 1.5.1 From 0c838f9f5ecec5c0f93d194a00fb82d3877c2c09 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 17:45:52 +0000 Subject: Minor tweaks --- synapse/appservice/scheduler.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3162fbec11..27271e468d 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,6 +49,7 @@ UP & quit +---------- YES SUCCESS | | | NO <--- Have more txns? <------ Mark txn success & nuke -+ from db; incr AS pos. + Reset backoff. This is all tied together by the AppServiceScheduler which DIs the required components. @@ -77,7 +78,7 @@ class AppServiceScheduler(object): def store_event(self, event): # event_pool self.event_pool.append(event) - def get_events(self): # event_pool + def drain_events(self): # event_pool return self.event_pool @@ -90,11 +91,11 @@ class AppServiceTransaction(object): self.events = events def send(self, as_api): - # sends this transaction using this as_api + # TODO sends this transaction using this as_api pass def complete(self, store): - # increment txn id on AS and nuke txn contents from db + # TODO increment txn id on AS and nuke txn contents from db pass @@ -106,14 +107,14 @@ class _EventSorter(object): self.services = services def start_polling(self): - events = self.event_pool.get_events() + events = self.event_pool.drain_events() if events: self._process(events) - # repoll later on + # TODO repoll later on def _process(self, events): - # sort events - # f.e. (AS, events) => poke transaction controller + # TODO sort events + # TODO fe (AS, events) => poke transaction controller on_receive_events pass @@ -153,6 +154,7 @@ class _Recoverer(object): if txn: if txn.send(self.as_api): txn.complete(self.store) + self.backoff_counter = 1 else: self.backoff_counter += 1 self.recover(self.service) -- cgit 1.5.1 From d516d68b293448e686fe30c58d69e030e61ec955 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 10:25:50 +0000 Subject: Rejig structure given the appservice_handler already filters the correct ASes to use. --- synapse/appservice/scheduler.py | 144 ++++++++++++++++++++++++---------------- 1 file changed, 85 insertions(+), 59 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 27271e468d..19fe8e11e8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -16,17 +16,11 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: - ___________ - \O/ --- event -->| | +--------------+ - | - event ---->| event_pool|<-- poll 1/s for events ---| EventSorter | - / \ ---- event ->|___________| +--------------+ - USERS ____________________________| - | | | - V V V - ASa ASb ASc - [e,e] [e] [e,e,e] - | - V + _________ +---ASa[e]-->| Event | +----ASb[e]->| Grouper |<-poll 1/s--+ +--ASa[e]--->|_________| | ASa[e,e] ASb[e] + V -````````- +------------+ |````````|<--StoreTxn-|Transaction | |Database| | Controller |---> SEND TO AS @@ -43,11 +37,11 @@ Recoverer attempts to recover ASes who have died. The flow for this looks like: V | START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE backoff DB and try to send it - ^ |__________ -Mark AS as | V -UP & quit +---------- YES SUCCESS - | | | - NO <--- Have more txns? <------ Mark txn success & nuke -+ + ^ |___________ +Mark AS as | V +UP & quit +---------- YES SUCCESS + | | | + NO <--- Have more txns? <------ Mark txn success & nuke <-+ from db; incr AS pos. Reset backoff. @@ -62,24 +56,28 @@ class AppServiceScheduler(object): case is a simple array. """ - def __init__(self, store, as_api, services): - self.app_services = services - self.event_pool = [] + def __init__(self, clock, store, as_api): + self.clock = clock + self.store = store + self.as_api = as_api + self.event_grouper = _EventGrouper() - def create_recoverer(service): - return _Recoverer(store, as_api, service) - self.txn_ctrl = _TransactionController(store, as_api, create_recoverer) + def create_recoverer(service, callback): + return _Recoverer(clock, store, as_api, service, callback) - self.event_sorter = _EventSorter(self, self.txn_ctrl, services) + self.txn_ctrl = _TransactionController( + clock, store, as_api, self.event_grouper, create_recoverer + ) def start(self): - self.event_sorter.start_polling() - - def store_event(self, event): # event_pool - self.event_pool.append(event) + # check for any DOWN ASes and start recoverers for them. + _Recoverer.start( + self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + ) + self.txn_ctrl.start_polling() - def drain_events(self): # event_pool - return self.event_pool + def submit_event_for_as(self, service, event): + self.event_grouper.on_receive(service, event) class AppServiceTransaction(object): @@ -99,71 +97,99 @@ class AppServiceTransaction(object): pass -class _EventSorter(object): +class _EventGrouper(object): + """Groups events for the same application service together. + """ - def __init__(self, event_pool, txn_ctrl, services): - self.event_pool = event_pool - self.txn_ctrl = txn_ctrl - self.services = services + def __init__(self): + self.groups = {} # dict of {service: [events]} - def start_polling(self): - events = self.event_pool.drain_events() - if events: - self._process(events) - # TODO repoll later on - - def _process(self, events): - # TODO sort events - # TODO fe (AS, events) => poke transaction controller on_receive_events + def on_receive(self, service, event): + # TODO group this pass + def drain_groups(self): + return self.groups + class _TransactionController(object): - def __init__(self, store, as_api, recoverer_fn): + def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): + self.clock = clock self.store = store self.as_api = as_api + self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn - def on_receive_events(self, service, events): - txn = self._store_txn(service, events) - if txn.send(self.as_api): - txn.complete(self.store) - else: - self._start_recoverer(service) + def start_polling(self): + groups = self.event_grouper.drain_groups() + for service in groups: + txn_id = self._get_next_txn_id(service) + txn = AppServiceTransaction(service, txn_id, groups[service]) + self._store_txn(txn) + if self._is_service_up(service): + if txn.send(self.as_api): + txn.complete(self.store) + else: + # TODO mark AS as down + self._start_recoverer(service) + self.clock.call_later(1000, self.start_polling) + + + def on_recovered(self, service): + # TODO mark AS as UP + pass def _start_recoverer(self, service): - recoverer = self.recoverer_fn(service) + recoverer = self.recoverer_fn(service, self.on_recovered) recoverer.recover() - def _store_txn(self, service, events): - pass # returns AppServiceTransaction + def _is_service_up(self, service): + pass + + def _get_next_txn_id(self, service): + pass # TODO work out the next txn_id for this service + + def _store_txn(self, txn): + pass class _Recoverer(object): - def __init__(self, store, as_api, service): + @staticmethod + def start(clock, store, as_api, callback): + # TODO check for DOWN ASes and init recoverers + pass + + def __init__(self, clock, store, as_api, service, callback): + self.clock = clock self.store = store self.as_api = as_api self.service = service + self.callback = callback self.backoff_counter = 1 def recover(self): - # TODO wait a bit + self.clock.call_later(2000 ** self.backoff_counter, self.retry) + + def retry(self): txn = self._get_oldest_txn() if txn: if txn.send(self.as_api): txn.complete(self.store) + # reset the backoff counter and retry immediately self.backoff_counter = 1 + self.retry() + return else: self.backoff_counter += 1 - self.recover(self.service) + self.recover() return else: - self._set_service_recovered(self.service) + self._set_service_recovered() - def _set_service_recovered(self, service): - pass + def _set_service_recovered(self): + self.callback(self.service) def _get_oldest_txn(self): pass # returns AppServiceTransaction -- cgit 1.5.1 From 192e228a98f3700f48d7fd136f4dce2979ec7c90 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 11:50:27 +0000 Subject: Start adding some tests --- synapse/appservice/scheduler.py | 23 ++++---- tests/appservice/test_scheduler.py | 106 +++++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 10 deletions(-) create mode 100644 tests/appservice/test_scheduler.py (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 19fe8e11e8..754f39381f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,6 +49,8 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ +from twisted.internet import defer + class AppServiceScheduler(object): """ Public facing API for this module. Does the required DI to tie the @@ -105,11 +107,14 @@ class _EventGrouper(object): self.groups = {} # dict of {service: [events]} def on_receive(self, service, event): - # TODO group this - pass + if service not in self.groups: + self.groups[service] = [] + self.groups[service].append(event) def drain_groups(self): - return self.groups + groups = self.groups + self.groups = {} + return groups class _TransactionController(object): @@ -135,7 +140,6 @@ class _TransactionController(object): self._start_recoverer(service) self.clock.call_later(1000, self.start_polling) - def on_recovered(self, service): # TODO mark AS as UP pass @@ -172,26 +176,25 @@ class _Recoverer(object): def recover(self): self.clock.call_later(2000 ** self.backoff_counter, self.retry) + @defer.inlineCallbacks def retry(self): - txn = self._get_oldest_txn() + txn = yield self._get_oldest_txn() if txn: if txn.send(self.as_api): txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 self.retry() - return else: self.backoff_counter += 1 self.recover() - return else: self._set_service_recovered() def _set_service_recovered(self): self.callback(self.service) + @defer.inlineCallbacks def _get_oldest_txn(self): - pass # returns AppServiceTransaction - - + txn = yield self.store.get_oldest_txn(self.service) + defer.returnValue(txn) diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py new file mode 100644 index 0000000000..b41d4358cf --- /dev/null +++ b/tests/appservice/test_scheduler.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.appservice.scheduler import ( + AppServiceScheduler, AppServiceTransaction, _EventGrouper, + _TransactionController, _Recoverer +) +from twisted.internet import defer +from ..utils import MockClock +from mock import Mock +from tests import unittest + +class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.as_api = Mock() + self.store = Mock() + self.service = Mock() + self.callback = Mock() + self.recoverer = _Recoverer( + clock=self.clock, + as_api=self.as_api, + store=self.store, + service=self.service, + callback=self.callback, + ) + + def test_recover_service_single_txn(self): + txns = self._mk_txns(1) + self.store.get_oldest_txn = Mock(return_value=defer.succeed(txns[0])) + + self.recoverer.recover() + self.assertEquals(0, self.store.get_oldest_txn.call_count) + self.clock.advance_time(2000) + self.assertEquals(2, self.store.get_oldest_txn.call_count) + + def _mk_txns(self, num_txns): + return [ + Mock() for i in range(num_txns) + ] + + + +class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): + + def setUp(self): + self.grouper = _EventGrouper() + + def test_drain_single_event(self): + service = Mock() + event = Mock() + self.grouper.on_receive(service, event) + groups = self.grouper.drain_groups() + self.assertTrue(service in groups) + self.assertEquals([event], groups[service]) + self.assertEquals(1, len(groups.keys())) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) + + def test_drain_multiple_events(self): + service = Mock() + events = [Mock(), Mock(), Mock()] + for e in events: + self.grouper.on_receive(service, e) + groups = self.grouper.drain_groups() + self.assertTrue(service in groups) + self.assertEquals(events, groups[service]) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) + + def test_drain_multiple_services(self): + services = [Mock(), Mock(), Mock()] + events_a = [Mock(), Mock()] + events_b = [Mock()] + events_c = [Mock(), Mock(), Mock(), Mock()] + mappings = { + services[0]: events_a, + services[1]: events_b, + services[2]: events_c + } + for e in events_b: + self.grouper.on_receive(services[1], e) + for e in events_c: + self.grouper.on_receive(services[2], e) + for e in events_a: + self.grouper.on_receive(services[0], e) + + groups = self.grouper.drain_groups() + for service in services: + self.assertTrue(service in groups) + self.assertEquals(mappings[service], groups[service]) + self.assertEquals(3, len(groups.keys())) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) -- cgit 1.5.1 From 0fbfe1b08a791e95dc9e9d417f131e80b4ce8059 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 14:36:52 +0000 Subject: Add more tests; fix bugs. --- synapse/appservice/scheduler.py | 4 +-- tests/appservice/test_scheduler.py | 54 +++++++++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 754f39381f..f54df9c9a5 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -174,7 +174,7 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later(2000 ** self.backoff_counter, self.retry) + self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry) @defer.inlineCallbacks def retry(self): @@ -184,7 +184,7 @@ class _Recoverer(object): txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 - self.retry() + yield self.retry() else: self.backoff_counter += 1 self.recover() diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index b41d4358cf..1e3eb9e1cc 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -21,6 +21,7 @@ from ..utils import MockClock from mock import Mock from tests import unittest + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -37,21 +38,60 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): callback=self.callback, ) - def test_recover_service_single_txn(self): - txns = self._mk_txns(1) - self.store.get_oldest_txn = Mock(return_value=defer.succeed(txns[0])) + def test_recover_single_txn(self): + txn = Mock() + # return one txn to send, then no more old txns + txns = [txn, None] + + def take_txn(*args, **kwargs): + return defer.succeed(txns.pop(0)) + self.store.get_oldest_txn = Mock(side_effect=take_txn) self.recoverer.recover() + # shouldn't have called anything prior to waiting for exp backoff self.assertEquals(0, self.store.get_oldest_txn.call_count) + txn.send = Mock(return_value=True) + # wait for exp backoff self.clock.advance_time(2000) + self.assertEquals(1, txn.send.call_count) + self.assertEquals(1, txn.complete.call_count) + # 2 because it needs to get None to know there are no more txns self.assertEquals(2, self.store.get_oldest_txn.call_count) + self.assertEquals(1, self.callback.call_count) - def _mk_txns(self, num_txns): - return [ - Mock() for i in range(num_txns) - ] + def test_recover_retry_txn(self): + txn = Mock() + txns = [txn, None] + pop_txn = False + def take_txn(*args, **kwargs): + if pop_txn: + return defer.succeed(txns.pop(0)) + else: + return defer.succeed(txn) + self.store.get_oldest_txn = Mock(side_effect=take_txn) + self.recoverer.recover() + self.assertEquals(0, self.store.get_oldest_txn.call_count) + txn.send = Mock(return_value=False) + self.clock.advance_time(2000) + self.assertEquals(1, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + self.clock.advance_time(4000) + self.assertEquals(2, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + self.clock.advance_time(8000) + self.assertEquals(3, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + txn.send = Mock(return_value=True) # successfully send the txn + pop_txn = True # returns the txn the first time, then no more. + self.clock.advance_time(16000) + self.assertEquals(1, txn.send.call_count) # new mock reset call count + self.assertEquals(1, txn.complete.call_count) + self.assertEquals(1, self.callback.call_count) class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): -- cgit 1.5.1 From 141ec04d194c57f29756d6ccbda3f396cc3aa9e7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 14:53:35 +0000 Subject: Add stub ApplicationServiceTransactionStore. Bootstrap Recoverers. Fill in stub Transaction functions. --- synapse/appservice/scheduler.py | 50 +++++++++++++++++++++++++++++++++++------ synapse/storage/appservice.py | 28 +++++++++++++++++++++++ 2 files changed, 71 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index f54df9c9a5..645d7bf6b2 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -71,11 +71,13 @@ class AppServiceScheduler(object): clock, store, as_api, self.event_grouper, create_recoverer ) + @defer.inlineCallbacks def start(self): # check for any DOWN ASes and start recoverers for them. - _Recoverer.start( + recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ) + self.txn_ctrl.add_recoverers(recoverers) self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): @@ -91,12 +93,34 @@ class AppServiceTransaction(object): self.events = events def send(self, as_api): - # TODO sends this transaction using this as_api - pass + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) def complete(self, store): - # TODO increment txn id on AS and nuke txn contents from db - pass + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) class _EventGrouper(object): @@ -125,6 +149,8 @@ class _TransactionController(object): self.as_api = as_api self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn + # keep track of how many recoverers there are + self.recoverers = [] def start_polling(self): groups = self.event_grouper.drain_groups() @@ -144,6 +170,10 @@ class _TransactionController(object): # TODO mark AS as UP pass + def add_recoverers(self, recoverers): + for r in recoverers: + self.recoverers.append(r) + def _start_recoverer(self, service): recoverer = self.recoverer_fn(service, self.on_recovered) recoverer.recover() @@ -161,9 +191,15 @@ class _TransactionController(object): class _Recoverer(object): @staticmethod + @defer.inlineCallbacks def start(clock, store, as_api, callback): - # TODO check for DOWN ASes and init recoverers - pass + services = yield store.get_failing_appservices() + recoverers = [ + _Recoverer(clock, store, as_api, s, callback) for s in services + ] + for r in recoverers: + r.recover() + defer.returnValue(recoverers) def __init__(self, clock, store, as_api, service, callback): self.clock = clock diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e30265750a..c1762692b9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -336,3 +336,31 @@ class ApplicationServiceStore(SQLBaseStore): hs_token=service["hs_token"], sender=service["sender"] )) + + +class ApplicationServiceTransactionStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceTransactionStore, self).__init__(hs) + + def get_failing_appservices(self): + """Get a list of application services which are down. + + Returns: + A Deferred which resolves to a list of ApplicationServices, which + may be empty. + """ + pass + + def complete_appservice_txn(self, txn_id, service): + """Completes an application service transaction. + + Args: + txn_id(str): The transaction ID being completed. + service(ApplicationService): The application service which was sent + this transaction. + Returns: + A Deferred which resolves to True if this transaction was completed + successfully. + """ + pass -- cgit 1.5.1 From f260cb72cd3435d540411962a92ca2a9fd333eb1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 15:12:24 +0000 Subject: Flesh out more stub functions. --- synapse/appservice/__init__.py | 5 +++++ synapse/appservice/scheduler.py | 37 +++++++++++++++++++++++++++++-------- synapse/storage/appservice.py | 17 +++++++++++++++-- tests/appservice/test_scheduler.py | 5 +++-- 4 files changed, 52 insertions(+), 12 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a268a6bcc4..cc6c381566 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -20,6 +20,11 @@ import re logger = logging.getLogger(__name__) +class ApplicationServiceState(object): + DOWN = "down" + UP = "up" + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 645d7bf6b2..99e83747a8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,7 +49,11 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ +from synapse.appservice import ApplicationServiceState from twisted.internet import defer +import logging + +logger = logging.getLogger(__name__) class AppServiceScheduler(object): @@ -162,21 +166,36 @@ class _TransactionController(object): if txn.send(self.as_api): txn.complete(self.store) else: - # TODO mark AS as down self._start_recoverer(service) self.clock.call_later(1000, self.start_polling) - def on_recovered(self, service): - # TODO mark AS as UP - pass + @defer.inlineCallbacks + def on_recovered(self, recoverer): + applied_state = yield self.store.set_appservice_state( + recoverer.service, + ApplicationServiceState.UP + ) + if not applied_state: + logger.error("Failed to apply appservice state UP to service %s", + recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) + @defer.inlineCallbacks def _start_recoverer(self, service): - recoverer = self.recoverer_fn(service, self.on_recovered) - recoverer.recover() + applied_state = yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + if applied_state: + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + else: + logger.error("Failed to apply appservice state DOWN to service %s", + service) def _is_service_up(self, service): pass @@ -193,7 +212,9 @@ class _Recoverer(object): @staticmethod @defer.inlineCallbacks def start(clock, store, as_api, callback): - services = yield store.get_failing_appservices() + services = yield store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) recoverers = [ _Recoverer(clock, store, as_api, s, callback) for s in services ] @@ -228,7 +249,7 @@ class _Recoverer(object): self._set_service_recovered() def _set_service_recovered(self): - self.callback(self.service) + self.callback(self) @defer.inlineCallbacks def _get_oldest_txn(self): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c1762692b9..214f6d99c5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -343,15 +343,28 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceTransactionStore, self).__init__(hs) - def get_failing_appservices(self): - """Get a list of application services which are down. + def get_appservices_by_state(self, state): + """Get a list of application services based on their state. + Args: + state(ApplicationServiceState): The state to filter on. Returns: A Deferred which resolves to a list of ApplicationServices, which may be empty. """ pass + def set_appservice_state(self, service, state): + """Set the application service state. + + Args: + service(ApplicationService): The service whose state to set. + state(ApplicationServiceState): The connectivity state to apply. + Returns: + A Deferred which resolves to True if the state was set successfully. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 1e3eb9e1cc..ec8f77c54b 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -57,7 +57,8 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns self.assertEquals(2, self.store.get_oldest_txn.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) + self.assertEquals(self.recoverer.service, self.service) def test_recover_retry_txn(self): txn = Mock() @@ -91,7 +92,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.clock.advance_time(16000) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): -- cgit 1.5.1 From 7d3491c74180461dc9d49fc89dad233e240ac475 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 15:17:50 +0000 Subject: Add some loggers --- synapse/appservice/scheduler.py | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 99e83747a8..2b3aa3b0ea 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -171,6 +171,10 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): + self.recoverers.remove(recoverer) + logger.info("Successfully recovered application service: %s", + recoverer.service) + logger.info("Active recoverers: %s", len(self.recoverers)) applied_state = yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP @@ -182,6 +186,8 @@ class _TransactionController(object): def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) + if len(recoverers) > 0: + logger.info("Active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): @@ -190,6 +196,10 @@ class _TransactionController(object): ApplicationServiceState.DOWN ) if applied_state: + logger.info( + "Application service falling behind. Starting recoverer. %s", + service + ) recoverer = self.recoverer_fn(service, self.on_recovered) self.add_recoverers([recoverer]) recoverer.recover() -- cgit 1.5.1 From 0354659f9d8b60b9edc78b0b597bceb52b8c7b2b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:09:05 +0000 Subject: Finish synapse.appservice.scheduler implementation. With tests to assert behaviour. Not hooked up yet. Stub datastore methods not implemented yet. --- synapse/appservice/__init__.py | 39 +++++++++++++ synapse/appservice/scheduler.py | 63 ++++---------------- synapse/storage/appservice.py | 22 +++++++ tests/appservice/test_scheduler.py | 115 ++++++++++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 53 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index cc6c381566..743a8278ad 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -25,6 +25,45 @@ class ApplicationServiceState(object): UP = "up" +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) + + def complete(self, store): + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 2b3aa3b0ea..50ad3b8e83 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -88,45 +88,6 @@ class AppServiceScheduler(object): self.event_grouper.on_receive(service, event) -class AppServiceTransaction(object): - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - def send(self, as_api): - """Sends this transaction using the provided AS API interface. - - Args: - as_api(ApplicationServiceApi): The API to use to send. - Returns: - A Deferred which resolves to True if the transaction was sent. - """ - return as_api.push_bulk( - service=self.service, - events=self.events, - txn_id=self.id - ) - - def complete(self, store): - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - Returns: - A Deferred which resolves to True if the transaction was completed. - """ - return store.complete_appservice_txn( - service=self.service, - txn_id=self.id - ) - - class _EventGrouper(object): """Groups events for the same application service together. """ @@ -156,14 +117,18 @@ class _TransactionController(object): # keep track of how many recoverers there are self.recoverers = [] + @defer.inlineCallbacks def start_polling(self): groups = self.event_grouper.drain_groups() for service in groups: - txn_id = self._get_next_txn_id(service) - txn = AppServiceTransaction(service, txn_id, groups[service]) - self._store_txn(txn) - if self._is_service_up(service): - if txn.send(self.as_api): + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: txn.complete(self.store) else: self._start_recoverer(service) @@ -207,14 +172,10 @@ class _TransactionController(object): logger.error("Failed to apply appservice state DOWN to service %s", service) + @defer.inlineCallbacks def _is_service_up(self, service): - pass - - def _get_next_txn_id(self, service): - pass # TODO work out the next txn_id for this service - - def _store_txn(self, txn): - pass + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP) class _Recoverer(object): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 214f6d99c5..6fde7dcc66 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -354,6 +354,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + pass + def set_appservice_state(self, service, state): """Set the application service state. @@ -365,6 +375,18 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list): A list of events to put in the transaction. + Returns: + ApplicationServiceTransaction: A new transaction. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index ec8f77c54b..a31755da67 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, AppServiceTransaction, _EventGrouper, - _TransactionController, _Recoverer + AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -22,6 +22,116 @@ from mock import Mock from tests import unittest +class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.store = Mock() + self.as_api = Mock() + self.event_grouper = Mock() + self.recoverer = Mock() + self.recoverer_fn = Mock(return_value=self.recoverer) + self.txnctrl = _TransactionController( + clock=self.clock, store=self.store, as_api=self.as_api, + event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + ) + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is successfully sent. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + txn.send = Mock(return_value=defer.succeed(True)) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made + txn.complete.assert_called_once_with(self.store) # txn completed + + def test_poll_single_group_service_down(self): + # Test: The AS is down so it shouldn't push; Recoverers will do it. + # It should still make a transaction though. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + + self.event_grouper.drain_groups = Mock(return_value=groups) + txn = Mock(id="idhere", service=service, events=events) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.DOWN) + ) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, txn.send.call_count) # txn not sent though + self.assertEquals(0, txn.complete.call_count) # or completed + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is not sent. A Recoverer is made and + # started. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + self.store.set_appservice_state = Mock(return_value=defer.succeed(True)) + txn.send = Mock(return_value=defer.succeed(False)) # fails to send + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events + ) + self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made + self.assertEquals(1, self.recoverer.recover.call_count) # and invoked + self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored + self.assertEquals(0, txn.complete.call_count) # txn not completed + self.store.set_appservice_state.assert_called_once_with( + service, ApplicationServiceState.DOWN # service marked as down + ) + + def test_poll_no_groups(self): + self.as_api.push_bulk = Mock() + self.event_grouper.drain_groups = Mock(return_value={}) + self.txnctrl.start_polling() + self.assertEquals(0, self.as_api.push_bulk.call_count) + + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -94,6 +204,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) + class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def setUp(self): -- cgit 1.5.1 From 2602ddc379f9bede21cafc8c8f7f57dec44cf69d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:16:14 +0000 Subject: Apply clarity and docstrings --- synapse/appservice/scheduler.py | 2 +- synapse/storage/appservice.py | 14 +++++++++++++- tests/appservice/test_scheduler.py | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 50ad3b8e83..514148c947 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -224,5 +224,5 @@ class _Recoverer(object): @defer.inlineCallbacks def _get_oldest_txn(self): - txn = yield self.store.get_oldest_txn(self.service) + txn = yield self.store.get_oldest_unsent_txn(self.service) defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 6fde7dcc66..4447c8a2e1 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -383,7 +383,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The service who the transaction is for. events(list): A list of events to put in the transaction. Returns: - ApplicationServiceTransaction: A new transaction. + AppServiceTransaction: A new transaction. """ pass @@ -399,3 +399,15 @@ class ApplicationServiceTransactionStore(SQLBaseStore): successfully. """ pass + + def get_oldest_unsent_txn(self, service): + """Get the oldest transaction which has not been sent for this + service. + + Args: + service(ApplicationService): The app service to get the oldest txn. + Returns: + A Deferred which resolves to an AppServiceTransaction or + None. + """ + pass diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index a31755da67..f75a6f5d95 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer + _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock -- cgit 1.5.1 From 64345b75597cba56e12a172fb227ac2c67993bbd Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:41:19 +0000 Subject: Upper bound the backoff. --- synapse/appservice/scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 514148c947..ee5978da6e 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -214,7 +214,9 @@ class _Recoverer(object): self.backoff_counter = 1 yield self.retry() else: - self.backoff_counter += 1 + # cap the backoff to be around 18h => (2^16) = 65536 secs + if self.backoff_counter < 16: + self.backoff_counter += 1 self.recover() else: self._set_service_recovered() -- cgit 1.5.1 From 01c099d9ef2b3891643845031c917fd0dc41d954 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:16:47 +0000 Subject: Add appservice txns sql schema --- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++++ .../storage/schema/delta/15/appservice_txns.sql | 31 ++++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/15/appservice_txns.sql (limited to 'synapse') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a3ff995695..dfce5224a9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 4447c8a2e1..eec8fbd592 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -385,6 +385,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ + # TODO: work out txn id (highest txn id for this service += 1) + # TODO: Within same db transaction, Insert new txn into txn table pass def complete_appservice_txn(self, txn_id, service): @@ -398,6 +400,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to True if this transaction was completed successfully. """ + # TODO: Set current txn_id for AS to 'txn_id' + # TODO: Delete txn contents pass def get_oldest_unsent_txn(self, service): @@ -410,4 +414,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ + # TODO: Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) pass diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql new file mode 100644 index 0000000000..11f0c799aa --- /dev/null +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -0,0 +1,31 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id INTEGER PRIMARY KEY, + state TEXT NOT NULL, + last_txn TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id INTEGER NOT NULL, + txn_id INTEGER NOT NULL, + content TEXT NOT NULL, + UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK +); + + + -- cgit 1.5.1 From 4a6afa6abf6c90c393bc3fa00e40d3927fc0c6c1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:27:55 +0000 Subject: Assign the AS ID from the database; replace old placeholder txn id. --- synapse/appservice/__init__.py | 4 ++-- synapse/storage/appservice.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 743a8278ad..c60db16b74 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -79,13 +79,13 @@ class ApplicationService(object): NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] def __init__(self, token, url=None, namespaces=None, hs_token=None, - sender=None, txn_id=None): + sender=None, id=None): self.token = token self.url = url self.hs_token = hs_token self.sender = sender self.namespaces = self._check_namespaces(namespaces) - self.txn_id = txn_id + self.id = id def _check_namespaces(self, namespaces): # Sanity check that it is of the form: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index eec8fbd592..582269b8d5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -302,6 +302,7 @@ class ApplicationServiceStore(SQLBaseStore): if as_token not in services: # add the service services[as_token] = { + "id": res["as_id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -326,7 +327,6 @@ class ApplicationServiceStore(SQLBaseStore): except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) - # TODO get last successful txn id f.e. service for service in services.values(): logger.info("Found application service: %s", service) self.services_cache.append(ApplicationService( @@ -334,7 +334,8 @@ class ApplicationServiceStore(SQLBaseStore): url=service["url"], namespaces=service["namespaces"], hs_token=service["hs_token"], - sender=service["sender"] + sender=service["sender"], + id=service["id"] )) -- cgit 1.5.1 From 406d32f8b514a572627eef1326d472e2825b2fe1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:35:14 +0000 Subject: Start implementing ApplicationServiceTransactionStore --- synapse/storage/appservice.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 582269b8d5..0b272e82dd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -374,7 +374,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: A Deferred which resolves to True if the state was set successfully. """ - pass + return self._simple_upsert( + "application_services_state", + dict(as_id=service.id), + dict(state=state) + ) def create_appservice_txn(self, service, events): """Atomically creates a new transaction for this application service -- cgit 1.5.1 From 1c2dcf762a8fe28390e9a98a01577aaadca7f1c0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 13:10:31 +0000 Subject: Partially implement txn store methods with tests. --- synapse/storage/appservice.py | 61 ++++++++++---- tests/storage/test_appservice.py | 171 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 213 insertions(+), 19 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 0b272e82dd..37078f9ef0 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson from simplejson import JSONDecodeError +import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -142,7 +142,7 @@ class ApplicationServiceStore(SQLBaseStore): txn.execute( "INSERT INTO application_services_regex(" "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, simplejson.dumps(regex_obj)) + (as_id, ns_int, json.dumps(regex_obj)) ) return True @@ -277,12 +277,7 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @defer.inlineCallbacks - def _populate_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + def _parse_services_dict(self, results): # SQL results in the form: # [ # { @@ -296,13 +291,12 @@ class ApplicationServiceStore(SQLBaseStore): # } # ] services = {} - results = yield self._execute_and_decode(sql) for res in results: as_token = res["token"] if as_token not in services: # add the service services[as_token] = { - "id": res["as_id"], + "id": res["id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -320,16 +314,16 @@ class ApplicationServiceStore(SQLBaseStore): try: services[as_token]["namespaces"][ ApplicationService.NS_LIST[ns_int]].append( - simplejson.loads(res["regex"]) + json.loads(res["regex"]) ) except IndexError: logger.error("Bad namespace enum '%s'. %s", ns_int, res) except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) + service_list = [] for service in services.values(): - logger.info("Found application service: %s", service) - self.services_cache.append(ApplicationService( + service_list.append(ApplicationService( token=service["token"], url=service["url"], namespaces=service["namespaces"], @@ -337,6 +331,21 @@ class ApplicationServiceStore(SQLBaseStore): sender=service["sender"], id=service["id"] )) + return service_list + + @defer.inlineCallbacks + def _populate_cache(self): + """Populates the ApplicationServiceCache from the database.""" + sql = ("SELECT * FROM application_services LEFT JOIN " + "application_services_regex ON application_services.id = " + "application_services_regex.as_id") + + results = yield self._execute_and_decode(sql) + services = self._parse_services_dict(results) + + for service in services: + logger.info("Found application service: %s", service) + self.services_cache.append(service) class ApplicationServiceTransactionStore(SQLBaseStore): @@ -344,6 +353,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceTransactionStore, self).__init__(hs) + @defer.inlineCallbacks def get_appservices_by_state(self, state): """Get a list of application services based on their state. @@ -353,8 +363,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to a list of ApplicationServices, which may be empty. """ - pass + sql = ( + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " + "application_services AS a ON a.id=s.as_id LEFT JOIN " + "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + ) + results = yield self._execute_and_decode(sql, state) + # NB: This assumes this class is linked with ApplicationServiceStore + defer.returnValue(self._parse_services_dict(results)) + @defer.inlineCallbacks def get_appservice_state(self, service): """Get the application service state. @@ -363,7 +381,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: A Deferred which resolves to ApplicationServiceState. """ - pass + result = yield self._simple_select_one( + "application_services_state", + dict(as_id=service.id), + ["state"], + allow_none=True + ) + if result: + defer.returnValue(result.get("state")) + return + defer.returnValue(None) def set_appservice_state(self, service, state): """Set the application service state. @@ -372,7 +399,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The service whose state to set. state(ApplicationServiceState): The connectivity state to apply. Returns: - A Deferred which resolves to True if the state was set successfully. + A Deferred which resolves when the state was set successfully. """ return self._simple_upsert( "application_services_state", diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index ca5b92ec85..30c0b43d96 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -15,9 +15,11 @@ from tests import unittest from twisted.internet import defer -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.server import HomeServer -from synapse.storage.appservice import ApplicationServiceStore +from synapse.storage.appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -114,3 +116,168 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): def test_retrieval_of_all_services(self): services = yield self.store.get_app_services() self.assertEquals(len(services), 3) + + +class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + self.db_pool = SQLiteMemoryDbPool() + yield self.db_pool.prepare() + hs = HomeServer( + "test", db_pool=self.db_pool, clock=MockClock(), config=Mock() + ) + self.as_list = [ + { + "token": "token1", + "url": "https://matrix-as.org", + "id": 3 + }, + { + "token": "alpha_tok", + "url": "https://alpha.com", + "id": 5 + }, + { + "token": "beta_tok", + "url": "https://beta.com", + "id": 6 + }, + { + "token": "delta_tok", + "url": "https://delta.com", + "id": 7 + }, + ] + for s in self.as_list: + yield self._add_service(s["id"], s["url"], s["token"]) + self.store = TestTransactionStore(hs) + + def _add_service(self, as_id, url, token): + return self.db_pool.runQuery( + "INSERT INTO application_services(id, url, token) VALUES(?,?,?)", + (as_id, url, token) + ) + + def _set_state(self, id, state, txn=None): + return self.db_pool.runQuery( + "INSERT INTO application_services_state(as_id, state, last_txn) " + "VALUES(?,?,?)", + (id, state, txn) + ) + + @defer.inlineCallbacks + def test_get_appservice_state_none(self): + service = Mock(id=999) + state = yield self.store.get_appservice_state(service) + self.assertEquals(None, state) + + @defer.inlineCallbacks + def test_get_appservice_state_up(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.UP + ) + service = Mock(id=self.as_list[0]["id"]) + state = yield self.store.get_appservice_state(service) + self.assertEquals(ApplicationServiceState.UP, state) + + @defer.inlineCallbacks + def test_get_appservice_state_down(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.UP + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[2]["id"], ApplicationServiceState.DOWN + ) + service = Mock(id=self.as_list[1]["id"]) + state = yield self.store.get_appservice_state(service) + self.assertEquals(ApplicationServiceState.DOWN, state) + + @defer.inlineCallbacks + def test_get_appservices_by_state_none(self): + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(0, len(services)) + + @defer.inlineCallbacks + def test_set_appservices_state_down(self): + service = Mock(id=self.as_list[1]["id"]) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + rows = yield self.db_pool.runQuery( + "SELECT as_id FROM application_services_state WHERE state=?", + (ApplicationServiceState.DOWN,) + ) + self.assertEquals(service.id, rows[0][0]) + + @defer.inlineCallbacks + def test_set_appservices_state_multiple_up(self): + service = Mock(id=self.as_list[1]["id"]) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.UP + ) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.UP + ) + rows = yield self.db_pool.runQuery( + "SELECT as_id FROM application_services_state WHERE state=?", + (ApplicationServiceState.UP,) + ) + self.assertEquals(service.id, rows[0][0]) + + @defer.inlineCallbacks + def test_get_appservices_by_state_single(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.UP + ) + + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(1, len(services)) + self.assertEquals(self.as_list[0]["id"], services[0].id) + + @defer.inlineCallbacks + def test_get_appservices_by_state_multiple(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.UP + ) + yield self._set_state( + self.as_list[2]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[3]["id"], ApplicationServiceState.UP + ) + + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(2, len(services)) + self.assertEquals(self.as_list[2]["id"], services[0].id) + self.assertEquals(self.as_list[0]["id"], services[1].id) + + +# required for ApplicationServiceTransactionStoreTestCase tests +class TestTransactionStore(ApplicationServiceTransactionStore, + ApplicationServiceStore): + + def __init__(self, hs): + super(TestTransactionStore, self).__init__(hs) -- cgit 1.5.1 From 1ead1caa18bdbf708446f1faa3d6f3dd13e63c29 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 13:54:20 +0000 Subject: Implement create_appservice_txn with tests. --- synapse/storage/appservice.py | 46 ++++++++++++++++++++++++--- tests/storage/test_appservice.py | 67 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 37078f9ef0..1360a00eae 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,9 +17,10 @@ from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer +from syutil.jsonutil import encode_canonical_json from synapse.api.constants import Membership from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService, ApplicationServiceState +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -417,9 +418,46 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ - # TODO: work out txn id (highest txn id for this service += 1) - # TODO: Within same db transaction, Insert new txn into txn table - pass + return self.runInteraction( + "create_appservice_txn", + self._create_appservice_txn, + service, events + ) + + def _create_appservice_txn(self, txn, service, events): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service.id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None: # no row exists + last_txn_id = 0 + else: + last_txn_id = int(last_txn_id[0]) # select 'last_txn' col + + result = txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = result.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 + + new_txn_id = max(highest_txn_id, last_txn_id) + 1 + + # Insert new txn into txn table + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, content) " + "VALUES(?,?,?)", + (service.id, new_txn_id, encode_canonical_json(events)) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 30c0b43d96..7a8cdb5593 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -21,6 +21,7 @@ from synapse.storage.appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) +import json from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -166,6 +167,20 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): (id, state, txn) ) + def _insert_txn(self, as_id, txn_id, content): + return self.db_pool.runQuery( + "INSERT INTO application_services_txns(as_id, txn_id, content) " + "VALUES(?,?,?)", + (as_id, txn_id, json.dumps(content)) + ) + + def _set_last_txn(self, as_id, txn_id): + return self.db_pool.runQuery( + "INSERT INTO application_services_state(as_id, last_txn, state) " + "VALUES(?,?,?)", + (as_id, txn_id, ApplicationServiceState.UP) + ) + @defer.inlineCallbacks def test_get_appservice_state_none(self): service = Mock(id=999) @@ -237,6 +252,58 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) self.assertEquals(service.id, rows[0][0]) + @defer.inlineCallbacks + def test_create_appservice_txn_first(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 1) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_older_last_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) # AS is falling behind + yield self._insert_txn(service.id, 9644, events) + yield self._insert_txn(service.id, 9645, events) + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9646) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_up_to_date_last_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9644) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_up_fuzzing(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) + + # dump in rows with higher IDs to make sure the queries aren't wrong. + yield self._set_last_txn(self.as_list[1]["id"], 119643) + yield self._set_last_txn(self.as_list[2]["id"], 9) + yield self._set_last_txn(self.as_list[3]["id"], 9643) + yield self._insert_txn(self.as_list[1]["id"], 119644, events) + yield self._insert_txn(self.as_list[1]["id"], 119645, events) + yield self._insert_txn(self.as_list[1]["id"], 119646, events) + yield self._insert_txn(self.as_list[2]["id"], 10, events) + yield self._insert_txn(self.as_list[3]["id"], 9643, events) + + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9644) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + @defer.inlineCallbacks def test_get_appservices_by_state_single(self): yield self._set_state( -- cgit 1.5.1 From 0a60bbf4fac4262da3fee702ca46d2f019597ef1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 15:53:03 +0000 Subject: Finish appservice txn storage impl and tests. --- synapse/storage/appservice.py | 85 ++++++++++++++++++---- .../storage/schema/delta/15/appservice_txns.sql | 2 +- tests/storage/test_appservice.py | 68 +++++++++++++++++ 3 files changed, 139 insertions(+), 16 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 1360a00eae..d89b0cc8c9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -429,15 +429,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # The highest id may be the last one sent (in which case it is last_txn) # or it may be the highest in the txns list (which are waiting to be/are # being sent) - result = txn.execute( - "SELECT last_txn FROM application_services_state WHERE as_id=?", - (service.id,) - ) - last_txn_id = result.fetchone() - if last_txn_id is None: # no row exists - last_txn_id = 0 - else: - last_txn_id = int(last_txn_id[0]) # select 'last_txn' col + last_txn_id = self._get_last_txn(txn, service.id) result = txn.execute( "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", @@ -467,12 +459,43 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The application service which was sent this transaction. Returns: - A Deferred which resolves to True if this transaction was completed + A Deferred which resolves if this transaction was stored successfully. """ - # TODO: Set current txn_id for AS to 'txn_id' - # TODO: Delete txn contents - pass + return self.runInteraction( + "complete_appservice_txn", + self._complete_appservice_txn, + txn_id, service + ) + + def _complete_appservice_txn(self, txn, txn_id, service): + txn_id = int(txn_id) + + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) + ) + + # Delete txn contents + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) def get_oldest_unsent_txn(self, service): """Get the oldest transaction which has not been sent for this @@ -484,6 +507,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ - # TODO: Monotonically increasing txn ids, so just select the smallest + return self.runInteraction( + "get_oldest_unsent_appservice_txn", + self._get_oldest_unsent_txn, + service + ) + + def _get_oldest_unsent_txn(self, txn, service): + # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) - pass + result = txn.execute( + "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + entry = self.cursor_to_dict(result)[0] + + if not entry or entry["txn_id"] is None: + # the min(txn_id) part will force a row, so entry may not be None + return None + + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=json.loads( + entry["content"] + ) + ) + + def _get_last_txn(self, txn, service_id): + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service_id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 11f0c799aa..ff15aa019e 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( as_id INTEGER PRIMARY KEY, - state TEXT NOT NULL, + state TEXT, last_txn TEXT, FOREIGN KEY(as_id) REFERENCES application_services(id) ); diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 7a8cdb5593..d1809c7f3b 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -304,6 +304,74 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) + @defer.inlineCallbacks + def test_complete_appservice_txn_first_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"foo": "bar"}] + txn_id = 1 + + yield self._insert_txn(service.id, txn_id, events) + yield self.store.complete_appservice_txn(txn_id=txn_id, service=service) + + res = yield self.db_pool.runQuery( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service.id,) + ) + self.assertEquals(1, len(res)) + self.assertEquals(str(txn_id), res[0][0]) + + res = yield self.db_pool.runQuery( + "SELECT * FROM application_services_txns WHERE txn_id=?", + (txn_id,) + ) + self.assertEquals(0, len(res)) + + @defer.inlineCallbacks + def test_complete_appservice_txn_existing_in_state_table(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"foo": "bar"}] + txn_id = 5 + yield self._set_last_txn(service.id, 4) + yield self._insert_txn(service.id, txn_id, events) + yield self.store.complete_appservice_txn(txn_id=txn_id, service=service) + + res = yield self.db_pool.runQuery( + "SELECT last_txn, state FROM application_services_state WHERE " + "as_id=?", + (service.id,) + ) + self.assertEquals(1, len(res)) + self.assertEquals(str(txn_id), res[0][0]) + self.assertEquals(ApplicationServiceState.UP, res[0][1]) + + res = yield self.db_pool.runQuery( + "SELECT * FROM application_services_txns WHERE txn_id=?", + (txn_id,) + ) + self.assertEquals(0, len(res)) + + @defer.inlineCallbacks + def test_get_oldest_unsent_txn_none(self): + service = Mock(id=self.as_list[0]["id"]) + + txn = yield self.store.get_oldest_unsent_txn(service) + self.assertEquals(None, txn) + + @defer.inlineCallbacks + def test_get_oldest_unsent_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + + yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"}) + yield self._insert_txn(service.id, 10, events) + yield self._insert_txn(service.id, 11, [{"foo":"bar"}]) + yield self._insert_txn(service.id, 12, [{"argh":"bargh"}]) + + txn = yield self.store.get_oldest_unsent_txn(service) + self.assertEquals(service, txn.service) + self.assertEquals(10, txn.id) + self.assertEquals(events, txn.events) + @defer.inlineCallbacks def test_get_appservices_by_state_single(self): yield self._set_state( -- cgit 1.5.1 From 21fd84dcb8645a555cc35adb8b2a5a68536b8087 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:01:19 +0000 Subject: Use seconds; start gluing in the AS scheduler into the AS handler. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/__init__.py | 8 +++++++- synapse/handlers/appservice.py | 17 ++++++++++++++--- synapse/storage/__init__.py | 7 +++++-- tests/appservice/test_scheduler.py | 10 +++++----- tests/handlers/test_appservice.py | 7 +++++-- 6 files changed, 38 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index ee5978da6e..068d4bd087 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -132,7 +132,7 @@ class _TransactionController(object): txn.complete(self.store) else: self._start_recoverer(service) - self.clock.call_later(1000, self.start_polling) + self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): @@ -202,7 +202,7 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry) + self.clock.call_later((2 ** self.backoff_counter), self.retry) @defer.inlineCallbacks def retry(self): diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8d345bf936..0c51d615ec 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( @@ -54,7 +55,12 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( - hs, ApplicationServiceApi(hs) + hs, asapi, AppServiceScheduler( + clock=hs.get_clock(), + store=hs.get_datastore(), + as_api=asapi + ) ) self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2c488a46f6..f3cd458e6b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -26,15 +26,22 @@ import logging logger = logging.getLogger(__name__) +def log_failure(failure): + logger.error("Application Services Failure: %s", failure.value) + logger.error(failure.getTraceback()) + + # NB: Purposefully not inheriting BaseHandler since that contains way too much # setup code which this handler does not need or use. This makes testing a lot # easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api): + def __init__(self, hs, appservice_api, appservice_scheduler): self.store = hs.get_datastore() self.hs = hs self.appservice_api = appservice_api + self.scheduler = appservice_scheduler + self.started_scheduler = False @defer.inlineCallbacks def register(self, app_service): @@ -90,9 +97,13 @@ class ApplicationServicesHandler(object): if event.type == EventTypes.Member: yield self._check_user_exists(event.state_key) - # Fork off pushes to these services - XXX First cut, best effort + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services for service in services: - self.appservice_api.push(service, event) + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index dfce5224a9..6c159b52a0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.api.constants import EventTypes -from .appservice import ApplicationServiceStore +from .appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -79,7 +81,8 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + ApplicationServiceTransactionStore ): def __init__(self, hs): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 9532bf66b8..e18e879319 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -162,7 +162,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=True) # wait for exp backoff - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns @@ -185,21 +185,21 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.recoverer.recover() self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=False) - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(4000) + self.clock.advance_time(4) self.assertEquals(2, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(8000) + self.clock.advance_time(8) self.assertEquals(3, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) txn.send = Mock(return_value=True) # successfully send the txn pop_txn = True # returns the txn the first time, then no more. - self.clock.advance_time(16000) + self.clock.advance_time(16) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a2c541317c..06cb1dd4cf 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -27,10 +27,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): def setUp(self): self.mock_store = Mock() self.mock_as_api = Mock() + self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) self.handler = ApplicationServicesHandler( - hs, self.mock_as_api + hs, self.mock_as_api, self.mock_scheduler ) @defer.inlineCallbacks @@ -52,7 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() yield self.handler.notify_interested_services(event) - self.mock_as_api.push.assert_called_once_with(interested_service, event) + self.mock_scheduler.submit_event_for_as.assert_called_once_with( + interested_service, event + ) @defer.inlineCallbacks def test_query_room_alias_exists(self): -- cgit 1.5.1 From b98cd03193476dea5f8b47e79d4122bb18449ae2 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:25:20 +0000 Subject: Use event IDs instead of dumping event content in the txns table. --- synapse/storage/appservice.py | 14 +++++----- .../storage/schema/delta/15/appservice_txns.sql | 2 +- tests/storage/test_appservice.py | 30 ++++++++++++---------- 3 files changed, 26 insertions(+), 20 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d89b0cc8c9..c3c0a0bd43 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -442,10 +442,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): new_txn_id = max(highest_txn_id, last_txn_id) + 1 # Insert new txn into txn table + event_ids = [e.event_id for e in events] txn.execute( - "INSERT INTO application_services_txns(as_id, txn_id, content) " + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (service.id, new_txn_id, encode_canonical_json(events)) + (service.id, new_txn_id, json.dumps(event_ids)) ) return AppServiceTransaction( service=service, id=new_txn_id, events=events @@ -491,7 +492,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): dict(last_txn=txn_id) ) - # Delete txn contents + # Delete txn self._simple_delete_txn( txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id) @@ -526,10 +527,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # the min(txn_id) part will force a row, so entry may not be None return None + event_ids = json.loads(entry["event_ids"]) + events = self._get_events_txn(event_ids) + return AppServiceTransaction( - service=service, id=entry["txn_id"], events=json.loads( - entry["content"] - ) + service=service, id=entry["txn_id"], events=events ) def _get_last_txn(self, txn, service_id): diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index ff15aa019e..13bbb2de2e 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( CREATE TABLE IF NOT EXISTS application_services_txns( as_id INTEGER NOT NULL, txn_id INTEGER NOT NULL, - content TEXT NOT NULL, + event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK ); diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index d1809c7f3b..e79599f7fb 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -167,11 +167,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): (id, state, txn) ) - def _insert_txn(self, as_id, txn_id, content): + def _insert_txn(self, as_id, txn_id, events): return self.db_pool.runQuery( - "INSERT INTO application_services_txns(as_id, txn_id, content) " + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (as_id, txn_id, json.dumps(content)) + (as_id, txn_id, json.dumps([e.event_id for e in events])) ) def _set_last_txn(self, as_id, txn_id): @@ -255,7 +255,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_first(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn = yield self.store.create_appservice_txn(service, events) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) @@ -264,7 +264,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_older_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) # AS is falling behind yield self._insert_txn(service.id, 9644, events) yield self._insert_txn(service.id, 9645, events) @@ -276,7 +276,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_up_to_date_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) txn = yield self.store.create_appservice_txn(service, events) self.assertEquals(txn.id, 9644) @@ -286,7 +286,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_up_fuzzing(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) # dump in rows with higher IDs to make sure the queries aren't wrong. @@ -307,7 +307,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_complete_appservice_txn_first_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"foo": "bar"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 1 yield self._insert_txn(service.id, txn_id, events) @@ -329,7 +329,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_complete_appservice_txn_existing_in_state_table(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"foo": "bar"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 5 yield self._set_last_txn(service.id, 4) yield self._insert_txn(service.id, txn_id, events) @@ -360,12 +360,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] + other_events = [Mock(event_id="e5"), Mock(event_id="e6")] - yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"}) + # we aren't testing store._base stuff here, so mock this out + self.store._get_events_txn = Mock(return_value=events) + + yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) - yield self._insert_txn(service.id, 11, [{"foo":"bar"}]) - yield self._insert_txn(service.id, 12, [{"argh":"bargh"}]) + yield self._insert_txn(service.id, 11, other_events) + yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) -- cgit 1.5.1 From 04c9751f24885b974d564b3e5749b7fc9ce01c73 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:45:41 +0000 Subject: Bug fixes whilst putting it all together --- synapse/appservice/api.py | 1 + synapse/appservice/scheduler.py | 4 +++- synapse/storage/appservice.py | 9 ++++----- 3 files changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c17fb219c5..3acb8867a2 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -79,6 +79,7 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("push_bulk: Missing txn ID sending events to %s", service.url) txn_id = str(0) + txn_id = str(txn_id) uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 068d4bd087..3ee2406463 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -175,7 +175,7 @@ class _TransactionController(object): @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) - defer.returnValue(state == ApplicationServiceState.UP) + defer.returnValue(state == ApplicationServiceState.UP or state is None) class _Recoverer(object): @@ -208,6 +208,8 @@ class _Recoverer(object): def retry(self): txn = yield self._get_oldest_txn() if txn: + logger.info("Retrying transaction %s for service %s", + txn.id, txn.service) if txn.send(self.as_api): txn.complete(self.store) # reset the backoff counter and retry immediately diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c3c0a0bd43..ab03106513 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -337,9 +337,8 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def _populate_cache(self): """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " + "application_services_regex AS r ON a.id = r.as_id") results = yield self._execute_and_decode(sql) services = self._parse_services_dict(results) @@ -528,7 +527,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return None event_ids = json.loads(entry["event_ids"]) - events = self._get_events_txn(event_ids) + events = self._get_events_txn(txn, event_ids) return AppServiceTransaction( service=service, id=entry["txn_id"], events=events @@ -540,7 +539,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): (service_id,) ) last_txn_id = result.fetchone() - if last_txn_id is None: # no row exists + if last_txn_id is None or last_txn_id[0] is None: # no row exists return 0 else: return int(last_txn_id[0]) # select 'last_txn' col -- cgit 1.5.1 From 7e0bba555c4abeb55cffc123270ceee858839496 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:48:37 +0000 Subject: Remove unused import --- synapse/storage/appservice.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index ab03106513..fe347dfd3c 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,7 +17,6 @@ from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer -from syutil.jsonutil import encode_canonical_json from synapse.api.constants import Membership from synapse.api.errors import StoreError from synapse.appservice import ApplicationService, AppServiceTransaction -- cgit 1.5.1 From db1fbc6c6fb23ab92dd712aa60f0ff46ea76b42c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 10 Mar 2015 10:04:20 +0000 Subject: Fix remaining scheduler bugs. Add more informative logging. --- synapse/appservice/api.py | 8 +++---- synapse/appservice/scheduler.py | 52 +++++++++++++++++------------------------ synapse/storage/appservice.py | 5 ++-- 3 files changed, 28 insertions(+), 37 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3acb8867a2..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) - response = None try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3ee2406463..add1e3879c 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ class AppServiceScheduler(object): @defer.inlineCallbacks def start(self): + logger.info("Starting appservice scheduler") # check for any DOWN ASes and start recoverers for them. recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered @@ -137,40 +138,33 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service: %s", - recoverer.service) - logger.info("Active recoverers: %s", len(self.recoverers)) - applied_state = yield self.store.set_appservice_state( + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - if not applied_state: - logger.error("Failed to apply appservice state UP to service %s", - recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) if len(recoverers) > 0: - logger.info("Active recoverers: %s", len(self.recoverers)) + logger.info("New active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): - applied_state = yield self.store.set_appservice_state( + yield self.store.set_appservice_state( service, ApplicationServiceState.DOWN ) - if applied_state: - logger.info( - "Application service falling behind. Starting recoverer. %s", - service - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() - else: - logger.error("Failed to apply appservice state DOWN to service %s", - service) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() @defer.inlineCallbacks def _is_service_up(self, service): @@ -190,6 +184,8 @@ class _Recoverer(object): _Recoverer(clock, store, as_api, s, callback) for s in services ] for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) r.recover() defer.returnValue(recoverers) @@ -206,12 +202,13 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): - txn = yield self._get_oldest_txn() + txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for service %s", - txn.id, txn.service) - if txn.send(self.as_api): - txn.complete(self.store) + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 yield self.retry() @@ -225,8 +222,3 @@ class _Recoverer(object): def _set_service_recovered(self): self.callback(self) - - @defer.inlineCallbacks - def _get_oldest_txn(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe347dfd3c..c4b4f56c5d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore): services = {} for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { @@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) result = txn.execute( - "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", (service.id,) ) entry = self.cursor_to_dict(result)[0] - if not entry or entry["txn_id"] is None: # the min(txn_id) part will force a row, so entry may not be None return None -- cgit 1.5.1 From 835e01fc7047e34a813936544027596627a112df Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 10:16:59 +0000 Subject: Minor PR comment tweaks. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/appservice.py | 10 ++++++++-- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++--- tests/appservice/test_scheduler.py | 10 +++++----- 5 files changed, 19 insertions(+), 13 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index add1e3879c..8a3a6a880f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -86,7 +86,7 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.on_receive(service, event) + self.event_grouper.enqueue(service, event) class _EventGrouper(object): @@ -96,7 +96,7 @@ class _EventGrouper(object): def __init__(self): self.groups = {} # dict of {service: [events]} - def on_receive(self, service, event): + def enqueue(self, service, event): if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f3cd458e6b..a24f7f5587 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,8 +27,14 @@ logger = logging.getLogger(__name__) def log_failure(failure): - logger.error("Application Services Failure: %s", failure.value) - logger.error(failure.getTraceback()) + logger.error( + "Application Services Failure", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) # NB: Purposefully not inheriting BaseHandler since that contains way too much diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index efef859214..e752b035e6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -82,7 +82,7 @@ class DataStore(RoomMemberStore, RoomStore, FilteringStore, PusherStore, PushRuleStore, - ApplicationServiceTransactionStore + ApplicationServiceTransactionStore, ): def __init__(self, hs): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 670e1d56af..e928812bc9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -365,9 +365,9 @@ class ApplicationServiceTransactionStore(SQLBaseStore): may be empty. """ sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " - "application_services AS a ON a.id=s.as_id LEFT JOIN " - "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" + " application_services AS a ON a.id=s.as_id LEFT JOIN" + " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" ) results = yield self._execute_and_decode( "get_appservices_by_state", sql, state diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index e18e879319..4534d05b93 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -213,7 +213,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def test_drain_single_event(self): service = Mock() event = Mock() - self.grouper.on_receive(service, event) + self.grouper.enqueue(service, event) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals([event], groups[service]) @@ -225,7 +225,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): service = Mock() events = [Mock(), Mock(), Mock()] for e in events: - self.grouper.on_receive(service, e) + self.grouper.enqueue(service, e) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals(events, groups[service]) @@ -243,11 +243,11 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): services[2]: events_c } for e in events_b: - self.grouper.on_receive(services[1], e) + self.grouper.enqueue(services[1], e) for e in events_c: - self.grouper.on_receive(services[2], e) + self.grouper.enqueue(services[2], e) for e in events_a: - self.grouper.on_receive(services[0], e) + self.grouper.enqueue(services[0], e) groups = self.grouper.drain_groups() for service in services: -- cgit 1.5.1 From c9c444f56260b414d474ea7e9ae28a1a66400357 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 10:38:02 +0000 Subject: Wrap polling/retry blocks in try/excepts to avoid sending to other ASes breaking permanently should an error occur. --- synapse/appservice/scheduler.py | 68 +++++++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 29 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8a3a6a880f..59a870e271 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -120,19 +120,22 @@ class _TransactionController(object): @defer.inlineCallbacks def start_polling(self): - groups = self.event_grouper.drain_groups() - for service in groups: - txn = yield self.store.create_appservice_txn( - service=service, - events=groups[service] - ) - service_is_up = yield self._is_service_up(service) - if service_is_up: - sent = yield txn.send(self.as_api) - if sent: - txn.complete(self.store) - else: - self._start_recoverer(service) + try: + groups = self.event_grouper.drain_groups() + for service in groups: + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) + except Exception as e: + logger.exception(e) self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks @@ -200,25 +203,32 @@ class _Recoverer(object): def recover(self): self.clock.call_later((2 ** self.backoff_counter), self.retry) + def _backoff(self): + # cap the backoff to be around 18h => (2^16) = 65536 secs + if self.backoff_counter < 16: + self.backoff_counter += 1 + self.recover() + @defer.inlineCallbacks def retry(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - if txn: - logger.info("Retrying transaction %s for AS ID %s", - txn.id, txn.service.id) - sent = yield txn.send(self.as_api) - if sent: - yield txn.complete(self.store) - # reset the backoff counter and retry immediately - self.backoff_counter = 1 - yield self.retry() + try: + txn = yield self.store.get_oldest_unsent_txn(self.service) + if txn: + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) + # reset the backoff counter and retry immediately + self.backoff_counter = 1 + yield self.retry() + else: + self._backoff() else: - # cap the backoff to be around 18h => (2^16) = 65536 secs - if self.backoff_counter < 16: - self.backoff_counter += 1 - self.recover() - else: - self._set_service_recovered() + self._set_service_recovered() + except Exception as e: + logger.exception(e) + self._backoff() def _set_service_recovered(self): self.callback(self) -- cgit 1.5.1 From 6279285b2ad59cf003b2e8d73d30dc706e1f3e4a Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 13:15:40 +0000 Subject: Replace EventGrouper for ServiceQueuer to move to push-based txns. Fix tests and add stub tests for ServiceQueuer. --- synapse/appservice/scheduler.py | 61 +++++++++++----------- tests/appservice/test_scheduler.py | 100 ++++++++++--------------------------- 2 files changed, 60 insertions(+), 101 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 59a870e271..54c42d1b94 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -16,11 +16,11 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: - _________ ----ASa[e]-->| Event | -----ASb[e]->| Grouper |<-poll 1/s--+ ---ASa[e]--->|_________| | ASa[e,e] ASb[e] - V + __________ +1---ASa[e]-->| Service |--> Queue ASa[f] +2----ASb[e]->| Queuer | +3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e] + V -````````- +------------+ |````````|<--StoreTxn-|Transaction | |Database| | Controller |---> SEND TO AS @@ -66,14 +66,14 @@ class AppServiceScheduler(object): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = _EventGrouper() def create_recoverer(service, callback): return _Recoverer(clock, store, as_api, service, callback) self.txn_ctrl = _TransactionController( - clock, store, as_api, self.event_grouper, create_recoverer + clock, store, as_api, create_recoverer ) + self.queuer = _ServiceQueuer(self.txn_ctrl) @defer.inlineCallbacks def start(self): @@ -86,17 +86,26 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.enqueue(service, event) + self.queuer.enqueue(service, event) -class _EventGrouper(object): - """Groups events for the same application service together. +class _ServiceQueuer(object): + """Queues events for the same application service together, sending + transactions as soon as possible. Once a transaction is sent successfully, + this schedules any other events in the queue to run. """ - def __init__(self): + def __init__(self, txn_ctrl): self.groups = {} # dict of {service: [events]} + self.txn_ctrl = txn_ctrl def enqueue(self, service, event): + # if nothing in queue for this service, send event immediately and add + # callbacks. + self.txn_ctrl.send(service, [event]) + + # else add to queue for this service + if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) @@ -109,34 +118,30 @@ class _EventGrouper(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): + def __init__(self, clock, store, as_api, recoverer_fn): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn # keep track of how many recoverers there are self.recoverers = [] @defer.inlineCallbacks - def start_polling(self): + def send(self, service, events): try: - groups = self.event_grouper.drain_groups() - for service in groups: - txn = yield self.store.create_appservice_txn( - service=service, - events=groups[service] - ) - service_is_up = yield self._is_service_up(service) - if service_is_up: - sent = yield txn.send(self.as_api) - if sent: - txn.complete(self.store) - else: - self._start_recoverer(service) + txn = yield self.store.create_appservice_txn( + service=service, + events=events + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) except Exception as e: logger.exception(e) - self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 4534d05b93..38d792eb02 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - _EventGrouper, _TransactionController, _Recoverer + _ServiceQueuer, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -28,25 +28,21 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.clock = MockClock() self.store = Mock() self.as_api = Mock() - self.event_grouper = Mock() self.recoverer = Mock() self.recoverer_fn = Mock(return_value=self.recoverer) self.txnctrl = _TransactionController( clock=self.clock, store=self.store, as_api=self.as_api, - event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + recoverer_fn=self.recoverer_fn ) - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_sent(self): # Test: The AS is up and the txn is successfully sent. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -56,7 +52,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -64,15 +60,12 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed - def test_poll_single_group_service_down(self): + def test_single_service_down(self): # Test: The AS is down so it shouldn't push; Recoverers will do it. # It should still make a transaction though. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events - self.event_grouper.drain_groups = Mock(return_value=groups) txn = Mock(id="idhere", service=service, events=events) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.DOWN) @@ -82,7 +75,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -90,18 +83,15 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_not_sent(self): # Test: The AS is up and the txn is not sent. A Recoverer is made and # started. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -112,7 +102,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events @@ -125,12 +115,6 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): service, ApplicationServiceState.DOWN # service marked as down ) - def test_poll_no_groups(self): - self.as_api.push_bulk = Mock() - self.event_grouper.drain_groups = Mock(return_value={}) - self.txnctrl.start_polling() - self.assertEquals(0, self.as_api.push_bulk.call_count) - class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): @@ -205,54 +189,24 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.callback.assert_called_once_with(self.recoverer) -class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): +class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def setUp(self): - self.grouper = _EventGrouper() - - def test_drain_single_event(self): - service = Mock() - event = Mock() - self.grouper.enqueue(service, event) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals([event], groups[service]) - self.assertEquals(1, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_events(self): - service = Mock() - events = [Mock(), Mock(), Mock()] - for e in events: - self.grouper.enqueue(service, e) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals(events, groups[service]) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_services(self): - services = [Mock(), Mock(), Mock()] - events_a = [Mock(), Mock()] - events_b = [Mock()] - events_c = [Mock(), Mock(), Mock(), Mock()] - mappings = { - services[0]: events_a, - services[1]: events_b, - services[2]: events_c - } - for e in events_b: - self.grouper.enqueue(services[1], e) - for e in events_c: - self.grouper.enqueue(services[2], e) - for e in events_a: - self.grouper.enqueue(services[0], e) - - groups = self.grouper.drain_groups() - for service in services: - self.assertTrue(service in groups) - self.assertEquals(mappings[service], groups[service]) - self.assertEquals(3, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) + self.txn_ctrl = Mock() + self.queuer = _ServiceQueuer(self.txn_ctrl) + + def test_send_single_event_no_queue(self): + # Expect the event to be sent immediately. + pass + + def test_send_single_event_with_queue(self): + # - Send an event and don't resolve it just yet. + # - Send another event: expect send() to NOT be called. + # - Resolve the send event + # - Expect queued event to be sent + pass + + def test_multiple_service_queues(self): + # Tests that each service has its own queue, and that they don't block + # on each other. + pass -- cgit 1.5.1 From d04fa1f7121d996e05bd4def14951d89eb47d1ab Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 14:03:16 +0000 Subject: Implement ServiceQueuer with tests. --- synapse/appservice/scheduler.py | 46 +++++++++++++++++++++----------- tests/appservice/test_scheduler.py | 54 +++++++++++++++++++++++++++++++++----- 2 files changed, 77 insertions(+), 23 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 54c42d1b94..3cedd479a2 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -83,7 +83,6 @@ class AppServiceScheduler(object): self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ) self.txn_ctrl.add_recoverers(recoverers) - self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) @@ -96,24 +95,37 @@ class _ServiceQueuer(object): """ def __init__(self, txn_ctrl): - self.groups = {} # dict of {service: [events]} + self.queued_events = {} # dict of {service_id: [events]} + self.pending_requests = {} # dict of {service_id: Deferred} self.txn_ctrl = txn_ctrl def enqueue(self, service, event): - # if nothing in queue for this service, send event immediately and add - # callbacks. - self.txn_ctrl.send(service, [event]) - - # else add to queue for this service - - if service not in self.groups: - self.groups[service] = [] - self.groups[service].append(event) - - def drain_groups(self): - groups = self.groups - self.groups = {} - return groups + # if this service isn't being sent something + if not self.pending_requests.get(service.id): + self._send_request(service, [event]) + else: + # add to queue for this service + if service.id not in self.queued_events: + self.queued_events[service.id] = [] + self.queued_events[service.id].append(event) + + def _send_request(self, service, events): + # send request and add callbacks + d = self.txn_ctrl.send(service, events) + d.addCallback(self._on_request_finish) + d.addErrback(self._on_request_fail) + self.pending_requests[service.id] = d + + def _on_request_finish(self, service): + self.pending_requests[service.id] = None + # if there are queued events, then send them. + if (service.id in self.queued_events + and len(self.queued_events[service.id]) > 0): + self._send_request(service, self.queued_events[service.id]) + self.queued_events[service.id] = [] + + def _on_request_fail(self, err): + logger.error("AS request failed: %s", err) class _TransactionController(object): @@ -142,6 +154,8 @@ class _TransactionController(object): self._start_recoverer(service) except Exception as e: logger.exception(e) + # request has finished + defer.returnValue(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 38d792eb02..82a5965097 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -197,16 +197,56 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def test_send_single_event_no_queue(self): # Expect the event to be sent immediately. - pass + service = Mock(id=4) + event = Mock() + self.queuer.enqueue(service, event) + self.txn_ctrl.send.assert_called_once_with(service, [event]) def test_send_single_event_with_queue(self): - # - Send an event and don't resolve it just yet. - # - Send another event: expect send() to NOT be called. - # - Resolve the send event - # - Expect queued event to be sent - pass + d = defer.Deferred() + self.txn_ctrl.send = Mock(return_value=d) + service = Mock(id=4) + event = Mock(event_id="first") + event2 = Mock(event_id="second") + event3 = Mock(event_id="third") + # Send an event and don't resolve it just yet. + self.queuer.enqueue(service, event) + # Send more events: expect send() to NOT be called multiple times. + self.queuer.enqueue(service, event2) + self.queuer.enqueue(service, event3) + self.txn_ctrl.send.assert_called_with(service, [event]) + self.assertEquals(1, self.txn_ctrl.send.call_count) + # Resolve the send event: expect the queued events to be sent + d.callback(service) + self.txn_ctrl.send.assert_called_with(service, [event2, event3]) + self.assertEquals(2, self.txn_ctrl.send.call_count) def test_multiple_service_queues(self): # Tests that each service has its own queue, and that they don't block # on each other. - pass + srv1 = Mock(id=4) + srv_1_defer = defer.Deferred() + srv_1_event = Mock(event_id="srv1a") + srv_1_event2 = Mock(event_id="srv1b") + + srv2 = Mock(id=6) + srv_2_defer = defer.Deferred() + srv_2_event = Mock(event_id="srv2a") + srv_2_event2 = Mock(event_id="srv2b") + + send_return_list = [srv_1_defer, srv_2_defer] + self.txn_ctrl.send = Mock(side_effect=lambda x,y: send_return_list.pop(0)) + + # send events for different ASes and make sure they are sent + self.queuer.enqueue(srv1, srv_1_event) + self.queuer.enqueue(srv1, srv_1_event2) + self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event]) + self.queuer.enqueue(srv2, srv_2_event) + self.queuer.enqueue(srv2, srv_2_event2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event]) + + # make sure callbacks for a service only send queued events for THAT + # service + srv_2_defer.callback(srv2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2]) + self.assertEquals(3, self.txn_ctrl.send.call_count) -- cgit 1.5.1 From f0d6f724a241a50d4a12b1c00af2a4cc6f9a43f1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 15:24:32 +0000 Subject: Set the service ID as soon as it is known. --- synapse/handlers/appservice.py | 2 +- synapse/storage/appservice.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index a24f7f5587..58b5b60bb7 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -59,13 +59,13 @@ class ApplicationServicesHandler(object): ) if not stored_service: raise StoreError(404, "Application service not found") + app_service.id = stored_service.id except StoreError: raise SynapseError( 403, "Unrecognised application services token. " "Consult the home server admin.", errcode=Codes.FORBIDDEN ) - app_service.hs_token = self._generate_hs_token() # create a sender for this application service which is used when diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e928812bc9..06b3a04afc 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -101,11 +101,12 @@ class ApplicationServiceStore(SQLBaseStore): if not service.hs_token: raise StoreError(500, "No HS token") - yield self.runInteraction( + as_id = yield self.runInteraction( "update_app_service", self._update_app_service_txn, service ) + service.id = as_id # update cache TODO: Should this be in the txn? for (index, cache_service) in enumerate(self.services_cache): @@ -124,7 +125,7 @@ class ApplicationServiceStore(SQLBaseStore): "update_app_service_txn: Failed to find as_id for token=", service.token ) - return False + return txn.execute( "UPDATE application_services SET url=?, hs_token=?, sender=? " @@ -144,7 +145,7 @@ class ApplicationServiceStore(SQLBaseStore): "as_id, namespace, regex) values(?,?,?)", (as_id, ns_int, json.dumps(regex_obj)) ) - return True + return as_id def _get_as_id_txn(self, txn, token): cursor = txn.execute( -- cgit 1.5.1 From ed008e85a8a2d9254d4d6f23cc7eb47ee52d0989 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Mar 2015 17:25:44 +0000 Subject: Reduce activity timer granularity to avoid too many quick updates (SYN-247) --- synapse/handlers/presence.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 731df00648..bbc7a0f200 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -33,6 +33,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) +# Don't bother bumping "last active" time if it differs by less than 60 seconds +LAST_ACTIVE_GRANULARITY = 60*1000 + + # TODO(paul): Maybe there's one of these I can steal from somewhere def partition(l, func): """Partition the list by the result of func applied to each element.""" @@ -282,6 +286,10 @@ class PresenceHandler(BaseHandler): if now is None: now = self.clock.time_msec() + prev_state = self._get_or_make_usercache(user) + if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: + return + self.changed_presencelike_data(user, {"last_active": now}) def changed_presencelike_data(self, user, state): -- cgit 1.5.1 From d6b3ea75d4eba6961242ce68d5df90557b00609b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:04:59 +0000 Subject: Implement the 'key in dict' test for LruCache() --- synapse/util/lrucache.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse') diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py index 65d5792907..2f7b615f78 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/lrucache.py @@ -90,12 +90,16 @@ class LruCache(object): def cache_len(): return len(cache) + def cache_contains(key): + return key in cache + self.sentinel = object() self.get = cache_get self.set = cache_set self.setdefault = cache_set_default self.pop = cache_pop self.len = cache_len + self.contains = cache_contains def __getitem__(self, key): result = self.get(key, self.sentinel) @@ -114,3 +118,6 @@ class LruCache(object): def __len__(self): return self.len() + + def __contains__(self, key): + return self.contains(key) -- cgit 1.5.1 From 9ba6487b3fe985c4ec84b02d9804aea7e2df6c40 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:05:34 +0000 Subject: Allow a choice of LRU behaviour for Cache() by using LruCache() or OrderedDict() --- synapse/storage/_base.py | 20 ++++++++++++-------- tests/storage/test__base.py | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 27ea65a0f6..6fa63f052e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -55,10 +55,14 @@ cache_counter = metrics.register_cache( class Cache(object): - def __init__(self, name, max_entries=1000, keylen=1): - self.cache = OrderedDict() + def __init__(self, name, max_entries=1000, keylen=1, lru=False): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries - self.max_entries = max_entries self.name = name self.keylen = keylen @@ -82,8 +86,9 @@ class Cache(object): if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) - while len(self.cache) > self.max_entries: - self.cache.popitem(last=False) + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) self.cache[keyargs] = value @@ -94,9 +99,7 @@ class Cache(object): self.cache.pop(keyargs, None) -# TODO(paul): -# * consider other eviction strategies - LRU? -def cached(max_entries=1000, num_args=1): +def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. The function is presumed to take zero or more arguments, which are used in @@ -115,6 +118,7 @@ def cached(max_entries=1000, num_args=1): name=orig.__name__, max_entries=max_entries, keylen=num_args, + lru=lru, ) @functools.wraps(orig) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index b6853ba2d4..96caf8c4c1 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -69,6 +69,28 @@ class CacheTestCase(unittest.TestCase): cache.get(2) cache.get(3) + def test_eviction_lru(self): + cache = Cache("test", max_entries=2, lru=True) + + cache.prefill(1, "one") + cache.prefill(2, "two") + + # Now access 1 again, thus causing 2 to be least-recently used + cache.get(1) + + cache.prefill(3, "three") + + failed = False + try: + cache.get(2) + except KeyError: + failed = True + + self.assertTrue(failed) + + cache.get(1) + cache.get(3) + class CacheDecoratorTestCase(unittest.TestCase): -- cgit 1.5.1 From 033a517febc434269eefc75e4d9646d015beae54 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 16:59:27 +0000 Subject: Indirect invalidations of _get_event_cache via a helper method to keep all uses of the cache lexically within one .py file --- synapse/storage/_base.py | 3 +++ synapse/storage/events.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6fa63f052e..374db1a304 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -732,6 +732,9 @@ class SQLBaseStore(object): return [e for e in events if e] + def _invalidate_get_event_cache(self, event_id): + self._get_event_cache.pop(event_id) + def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a86230d92c..2425f57f5f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -94,7 +94,7 @@ class EventsStore(SQLBaseStore): current_state=None): # Remove the any existing cache entries for the event_id - self._get_event_cache.pop(event.event_id) + self._invalidate_get_event_cache(event.event_id) # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table @@ -356,7 +356,7 @@ class EventsStore(SQLBaseStore): def _store_redaction(self, txn, event): # invalidate the cache for the redacted event - self._get_event_cache.pop(event.redacts) + self._invalidate_get_event_cache(event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) -- cgit 1.5.1 From 1b988b051b203ec17352b7422be141e622b4fa42 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 17:26:32 +0000 Subject: Store the rejected reason in (Frozen)Event structs --- synapse/events/__init__.py | 6 ++++-- synapse/storage/_base.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 64e08223b0..e4495ccf12 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -46,9 +46,10 @@ def _event_dict_property(key): class EventBase(object): def __init__(self, event_dict, signatures={}, unsigned={}, - internal_metadata_dict={}): + internal_metadata_dict={}, rejected_reason=None): self.signatures = signatures self.unsigned = unsigned + self.rejected_reason = rejected_reason self._event_dict = event_dict @@ -109,7 +110,7 @@ class EventBase(object): class FrozenEvent(EventBase): - def __init__(self, event_dict, internal_metadata_dict={}): + def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): event_dict = dict(event_dict) # Signatures is a dict of dicts, and this is faster than doing a @@ -128,6 +129,7 @@ class FrozenEvent(EventBase): signatures=signatures, unsigned=unsigned, internal_metadata_dict=internal_metadata_dict, + rejected_reason=rejected_reason, ) @staticmethod diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 374db1a304..7f5ad9b0fb 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -784,6 +784,7 @@ class SQLBaseStore(object): txn, internal_metadata, js, redacted, check_redacted=check_redacted, get_prev_content=get_prev_content, + rejected_reason=rejected_reason, ) cache[(check_redacted, get_prev_content, allow_rejected)] = result return result @@ -791,7 +792,8 @@ class SQLBaseStore(object): return None def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False): + check_redacted=True, get_prev_content=False, + rejected_reason=None): start_time = time.time() * 1000 @@ -806,7 +808,11 @@ class SQLBaseStore(object): internal_metadata = json.loads(internal_metadata) start_time = update_counter("decode_internal", start_time) - ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) start_time = update_counter("build_frozen_event", start_time) if check_redacted and redacted: -- cgit 1.5.1 From f173d40a32cba919e088917fe42ac300a10e0ad2 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 17:33:26 +0000 Subject: Use FrozenEvent's reject_reason to decide whether to return it; don't include allow_rejected in the main getEvents cache key --- synapse/storage/_base.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7f5ad9b0fb..919295eabb 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -749,10 +749,13 @@ class SQLBaseStore(object): try: # Separate cache entries for each way to invoke _get_event_txn - ret = cache[(check_redacted, get_prev_content, allow_rejected)] - + ret = cache[(check_redacted, get_prev_content)] cache_counter.inc_hits("*getEvent*") - return ret + + if allow_rejected or not ret.rejected_reason: + return ret + else: + return None except KeyError: cache_counter.inc_misses("*getEvent*") pass @@ -779,14 +782,15 @@ class SQLBaseStore(object): start_time = update_counter("select_event", start_time) + result = self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=rejected_reason, + ) + cache[(check_redacted, get_prev_content)] = result + if allow_rejected or not rejected_reason: - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=rejected_reason, - ) - cache[(check_redacted, get_prev_content, allow_rejected)] = result return result else: return None -- cgit 1.5.1 From 953e40f9dc086a47d811d1fe029734b3178266f3 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:12:16 +0000 Subject: Implement the main getEvent cache using Cache() instead of a custom application of LruCache; also unify its two-level structure into just one --- synapse/storage/_base.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 919295eabb..5c7bd22e64 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -241,10 +241,8 @@ class SQLBaseStore(object): self._txn_perf_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters() - self._get_event_cache = LruCache(hs.config.event_cache_size) - - # Pretend the getEventCache is just another named cache - caches_by_name["*getEvent*"] = self._get_event_cache + self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, + max_entries=hs.config.event_cache_size) def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -733,7 +731,9 @@ class SQLBaseStore(object): return [e for e in events if e] def _invalidate_get_event_cache(self, event_id): - self._get_event_cache.pop(event_id) + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, get_prev_content) def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): @@ -745,19 +745,14 @@ class SQLBaseStore(object): sql_getevents_timer.inc_by(curr_time - last_time, desc) return curr_time - cache = self._get_event_cache.setdefault(event_id, {}) - try: - # Separate cache entries for each way to invoke _get_event_txn - ret = cache[(check_redacted, get_prev_content)] - cache_counter.inc_hits("*getEvent*") + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) if allow_rejected or not ret.rejected_reason: return ret else: return None except KeyError: - cache_counter.inc_misses("*getEvent*") pass finally: start_time = update_counter("event_cache", start_time) @@ -788,7 +783,7 @@ class SQLBaseStore(object): get_prev_content=get_prev_content, rejected_reason=rejected_reason, ) - cache[(check_redacted, get_prev_content)] = result + self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) if allow_rejected or not rejected_reason: return result -- cgit 1.5.1 From 32206dde3f8dd59412490cd6f590304438c900f4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Mar 2015 10:11:52 +0000 Subject: Fixes from PR comments --- synapse/appservice/scheduler.py | 3 ++- synapse/storage/appservice.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3cedd479a2..59b0b1f4ac 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -112,7 +112,7 @@ class _ServiceQueuer(object): def _send_request(self, service, events): # send request and add callbacks d = self.txn_ctrl.send(service, events) - d.addCallback(self._on_request_finish) + d.addBoth(self._on_request_finish) d.addErrback(self._on_request_fail) self.pending_requests[service.id] = d @@ -154,6 +154,7 @@ class _TransactionController(object): self._start_recoverer(service) except Exception as e: logger.exception(e) + self._start_recoverer(service) # request has finished defer.returnValue(service) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 06b3a04afc..93304a745f 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.services_cache = [] - self.cache_defer = self._populate_cache() + self.cache_defer = self._populate_appservice_cache() self.cache_defer.addErrback(log_failure) @defer.inlineCallbacks @@ -337,7 +337,7 @@ class ApplicationServiceStore(SQLBaseStore): return service_list @defer.inlineCallbacks - def _populate_cache(self): + def _populate_appservice_cache(self): """Populates the ApplicationServiceCache from the database.""" sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " "application_services_regex AS r ON a.id = r.as_id") -- cgit 1.5.1 From a198894bf737566e368db546121aca026ed1fbeb Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 26 Mar 2015 11:53:58 +0000 Subject: Appease pep8 --- synapse/storage/_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5c7bd22e64..cf4c76d332 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -733,7 +733,8 @@ class SQLBaseStore(object): def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, get_prev_content) + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): -- cgit 1.5.1 From ff1fa0fbf80cbb636e4cce59846bb5dcc91ccd03 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 27 Mar 2015 15:57:16 +0000 Subject: Add another @cached wrapper, this time on get_presence_state() --- synapse/storage/presence.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 87fba55439..e6fc19ccec 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from twisted.internet import defer + +from ._base import SQLBaseStore, cached class PresenceStore(SQLBaseStore): @@ -33,6 +35,7 @@ class PresenceStore(SQLBaseStore): desc="has_presence_state", ) + @cached() def get_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -41,8 +44,9 @@ class PresenceStore(SQLBaseStore): desc="get_presence_state", ) + @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - return self._simple_update_one( + ret = yield self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -50,6 +54,8 @@ class PresenceStore(SQLBaseStore): "mtime": self._clock.time_msec()}, desc="set_presence_state", ) + self.get_presence_state.invalidate(user_localpart) + defer.returnValue(ret) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( -- cgit 1.5.1 From 3e420aebd86dbd641ddc07039b220420a43fc39c Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 27 Mar 2015 16:16:58 +0000 Subject: Revert "Add another @cached wrapper, this time on get_presence_state()" This reverts commit ff1fa0fbf80cbb636e4cce59846bb5dcc91ccd03. --- synapse/storage/presence.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index e6fc19ccec..87fba55439 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore class PresenceStore(SQLBaseStore): @@ -35,7 +33,6 @@ class PresenceStore(SQLBaseStore): desc="has_presence_state", ) - @cached() def get_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -44,9 +41,8 @@ class PresenceStore(SQLBaseStore): desc="get_presence_state", ) - @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - ret = yield self._simple_update_one( + return self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -54,8 +50,6 @@ class PresenceStore(SQLBaseStore): "mtime": self._clock.time_msec()}, desc="set_presence_state", ) - self.get_presence_state.invalidate(user_localpart) - defer.returnValue(ret) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( -- cgit 1.5.1 From 8366fde82f609448bd96a882c72cea7d2baa52f0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 30 Mar 2015 12:01:09 -0400 Subject: turn --disable-registration into --enable-registration, given the default is for registration to be disabled by default now. this is backwards incompatible by removing the old --disable-registration arg, but makes for a much more intuitive arg --- synapse/config/registration.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 4401e774d1..a6a2d2c5e1 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -25,11 +25,11 @@ class RegistrationConfig(Config): def __init__(self, args): super(RegistrationConfig, self).__init__(args) - # `args.disable_registration` may either be a bool or a string depending - # on if the option was given a value (e.g. --disable-registration=false - # would set `args.disable_registration` to "false" not False.) - self.disable_registration = bool( - distutils.util.strtobool(str(args.disable_registration)) + # `args.enable_registration` may either be a bool or a string depending + # on if the option was given a value (e.g. --enable-registration=true + # would set `args.enable_registration` to "true" not True.) + self.disable_registration = not bool( + distutils.util.strtobool(str(args.enable_registration)) ) self.registration_shared_secret = args.registration_shared_secret @@ -39,11 +39,11 @@ class RegistrationConfig(Config): reg_group = parser.add_argument_group("registration") reg_group.add_argument( - "--disable-registration", - const=True, - default=True, + "--enable-registration", + const=False, + default=False, nargs='?', - help="Disable registration of new users.", + help="Enable registration for new users.", ) reg_group.add_argument( "--registration-shared-secret", type=str, @@ -53,8 +53,8 @@ class RegistrationConfig(Config): @classmethod def generate_config(cls, args, config_dir_path): - if args.disable_registration is None: - args.disable_registration = True + if args.enable_registration is None: + args.enable_registration = False if args.registration_shared_secret is None: args.registration_shared_secret = random_string_with_symbols(50) -- cgit 1.5.1 From af853a4cdb4d6a15a9a249da8bf1aa5be1998aae Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 09:22:31 +0100 Subject: Add AppServiceConfig --- synapse/config/appservice.py | 31 +++++++++++++++++++++++++++++++ synapse/config/homeserver.py | 3 ++- 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 synapse/config/appservice.py (limited to 'synapse') diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py new file mode 100644 index 0000000000..399a716d80 --- /dev/null +++ b/synapse/config/appservice.py @@ -0,0 +1,31 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + + +class AppServiceConfig(Config): + + def __init__(self, args): + super(AppServiceConfig, self).__init__(args) + self.app_service_config_files = args.app_service_config_files + + @classmethod + def add_arguments(cls, parser): + super(AppServiceConfig, cls).add_arguments(parser) + group = parser.add_argument_group("appservice") + group.add_argument( + "--app-service-config-files", type=str, nargs='+', + help="A list of application service config files to use." + ) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 241afdf872..3edfadb98b 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -24,12 +24,13 @@ from .email import EmailConfig from .voip import VoipConfig from .registration import RegistrationConfig from .metrics import MetricsConfig +from .appservice import AppServiceConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, EmailConfig, VoipConfig, RegistrationConfig, - MetricsConfig,): + MetricsConfig, AppServiceConfig,): pass -- cgit 1.5.1 From e7887e37a86adbdc2dcb5bd3fbaabf836b168bd8 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 09:32:40 +0100 Subject: Remove appservice REST servlets --- synapse/app/homeserver.py | 5 -- synapse/rest/appservice/__init__.py | 14 ----- synapse/rest/appservice/v1/__init__.py | 29 ---------- synapse/rest/appservice/v1/base.py | 48 ----------------- synapse/rest/appservice/v1/register.py | 99 ---------------------------------- synapse/server.py | 1 - 6 files changed, 196 deletions(-) delete mode 100644 synapse/rest/appservice/__init__.py delete mode 100644 synapse/rest/appservice/v1/__init__.py delete mode 100644 synapse/rest/appservice/v1/base.py delete mode 100644 synapse/rest/appservice/v1/register.py (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 500cae05fb..29ca720d5e 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,7 +32,6 @@ from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site from synapse.http.server import JsonResource, RootRedirect -from synapse.rest.appservice.v1 import AppServiceRestResource from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.http.server_key_resource import LocalKey @@ -78,9 +77,6 @@ class SynapseHomeServer(HomeServer): def build_resource_for_federation(self): return JsonResource(self) - def build_resource_for_app_services(self): - return AppServiceRestResource(self) - def build_resource_for_web_client(self): import syweb syweb_path = os.path.dirname(syweb.__file__) @@ -141,7 +137,6 @@ class SynapseHomeServer(HomeServer): (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()), (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), (MEDIA_PREFIX, self.get_resource_for_media_repository()), - (APP_SERVICE_PREFIX, self.get_resource_for_app_services()), (STATIC_PREFIX, self.get_resource_for_static_content()), ] diff --git a/synapse/rest/appservice/__init__.py b/synapse/rest/appservice/__init__.py deleted file mode 100644 index 1a84d94cd9..0000000000 --- a/synapse/rest/appservice/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/synapse/rest/appservice/v1/__init__.py b/synapse/rest/appservice/v1/__init__.py deleted file mode 100644 index a7877609ad..0000000000 --- a/synapse/rest/appservice/v1/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from . import register - -from synapse.http.server import JsonResource - - -class AppServiceRestResource(JsonResource): - """A resource for version 1 of the matrix application service API.""" - - def __init__(self, hs): - JsonResource.__init__(self, hs) - self.register_servlets(self, hs) - - @staticmethod - def register_servlets(appservice_resource, hs): - register.register_servlets(hs, appservice_resource) diff --git a/synapse/rest/appservice/v1/base.py b/synapse/rest/appservice/v1/base.py deleted file mode 100644 index 65d5bcf9be..0000000000 --- a/synapse/rest/appservice/v1/base.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains base REST classes for constructing client v1 servlets. -""" - -from synapse.http.servlet import RestServlet -from synapse.api.urls import APP_SERVICE_PREFIX -import re - -import logging - - -logger = logging.getLogger(__name__) - - -def as_path_pattern(path_regex): - """Creates a regex compiled appservice path with the correct path - prefix. - - Args: - path_regex (str): The regex string to match. This should NOT have a ^ - as this will be prefixed. - Returns: - SRE_Pattern - """ - return re.compile("^" + APP_SERVICE_PREFIX + path_regex) - - -class AppServiceRestServlet(RestServlet): - """A base Synapse REST Servlet for the application services version 1 API. - """ - - def __init__(self, hs): - self.hs = hs - self.handler = hs.get_handlers().appservice_handler diff --git a/synapse/rest/appservice/v1/register.py b/synapse/rest/appservice/v1/register.py deleted file mode 100644 index ea24d88f79..0000000000 --- a/synapse/rest/appservice/v1/register.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains REST servlets to do with registration: /register""" -from twisted.internet import defer - -from base import AppServiceRestServlet, as_path_pattern -from synapse.api.errors import CodeMessageException, SynapseError -from synapse.storage.appservice import ApplicationService - -import json -import logging - -logger = logging.getLogger(__name__) - - -class RegisterRestServlet(AppServiceRestServlet): - """Handles AS registration with the home server. - """ - - PATTERN = as_path_pattern("/register$") - - @defer.inlineCallbacks - def on_POST(self, request): - params = _parse_json(request) - - # sanity check required params - try: - as_token = params["as_token"] - as_url = params["url"] - if (not isinstance(as_token, basestring) or - not isinstance(as_url, basestring)): - raise ValueError - except (KeyError, ValueError): - raise SynapseError( - 400, "Missed required keys: as_token(str) / url(str)." - ) - - try: - app_service = ApplicationService( - as_token, as_url, params["namespaces"] - ) - except ValueError as e: - raise SynapseError(400, e.message) - - app_service = yield self.handler.register(app_service) - hs_token = app_service.hs_token - - defer.returnValue((200, { - "hs_token": hs_token - })) - - -class UnregisterRestServlet(AppServiceRestServlet): - """Handles AS registration with the home server. - """ - - PATTERN = as_path_pattern("/unregister$") - - def on_POST(self, request): - params = _parse_json(request) - try: - as_token = params["as_token"] - if not isinstance(as_token, basestring): - raise ValueError - except (KeyError, ValueError): - raise SynapseError(400, "Missing required key: as_token(str)") - - yield self.handler.unregister(as_token) - - raise CodeMessageException(500, "Not implemented") - - -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.") - return content - except ValueError as e: - logger.warn(e) - raise SynapseError(400, "Content not JSON.") - - -def register_servlets(hs, http_server): - RegisterRestServlet(hs).register(http_server) - UnregisterRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index c7772244ba..0bd87bdd77 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -79,7 +79,6 @@ class BaseHomeServer(object): 'resource_for_content_repo', 'resource_for_server_key', 'resource_for_media_repository', - 'resource_for_app_services', 'resource_for_metrics', 'event_sources', 'ratelimiter', -- cgit 1.5.1 From d33ae65efc14a18a8a690d39d6e9c81aaafa1062 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 11:00:00 +0100 Subject: Remove more reg/unreg methods. Read config not database for cache. --- synapse/handlers/appservice.py | 37 ------- synapse/storage/appservice.py | 219 +++++++++++---------------------------- tests/storage/test_appservice.py | 39 ------- 3 files changed, 59 insertions(+), 236 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 58b5b60bb7..59cf15b037 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -16,10 +16,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import Codes, StoreError, SynapseError from synapse.appservice import ApplicationService from synapse.types import UserID -import synapse.util.stringutils as stringutils import logging @@ -49,38 +47,6 @@ class ApplicationServicesHandler(object): self.scheduler = appservice_scheduler self.started_scheduler = False - @defer.inlineCallbacks - def register(self, app_service): - logger.info("Register -> %s", app_service) - # check the token is recognised - try: - stored_service = yield self.store.get_app_service_by_token( - app_service.token - ) - if not stored_service: - raise StoreError(404, "Application service not found") - app_service.id = stored_service.id - except StoreError: - raise SynapseError( - 403, "Unrecognised application services token. " - "Consult the home server admin.", - errcode=Codes.FORBIDDEN - ) - app_service.hs_token = self._generate_hs_token() - - # create a sender for this application service which is used when - # creating rooms, etc.. - account = yield self.hs.get_handlers().registration_handler.register() - app_service.sender = account[0] - - yield self.store.update_app_service(app_service) - defer.returnValue(app_service) - - @defer.inlineCallbacks - def unregister(self, token): - logger.info("Unregister as_token=%s", token) - yield self.store.unregister_app_service(token) - @defer.inlineCallbacks def notify_interested_services(self, event): """Notifies (pushes) all application services interested in this event. @@ -223,6 +189,3 @@ class ApplicationServicesHandler(object): exists = yield self.query_user_exists(user_id) defer.returnValue(exists) defer.returnValue(True) - - def _generate_hs_token(self): - return stringutils.random_string(24) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 93304a745f..fe9372a7c6 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import yaml from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import StoreError from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -27,141 +27,18 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) -def log_failure(failure): - logger.error("Failed to detect application services: %s", failure.value) - logger.error(failure.getTraceback()) - - class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.services_cache = [] - self.cache_defer = self._populate_appservice_cache() - self.cache_defer.addErrback(log_failure) - - @defer.inlineCallbacks - def unregister_app_service(self, token): - """Unregisters this service. - - This removes all AS specific regex and the base URL. The token is the - only thing preserved for future registration attempts. - """ - yield self.cache_defer # make sure the cache is ready - yield self.runInteraction( - "unregister_app_service", - self._unregister_app_service_txn, - token, - ) - # update cache TODO: Should this be in the txn? - for service in self.services_cache: - if service.token == token: - service.url = None - service.namespaces = None - service.hs_token = None - - def _unregister_app_service_txn(self, txn, token): - # kill the url to prevent pushes - txn.execute( - "UPDATE application_services SET url=NULL WHERE token=?", - (token,) - ) - - # cleanup regex - as_id = self._get_as_id_txn(txn, token) - if not as_id: - logger.warning( - "unregister_app_service_txn: Failed to find as_id for token=", - token - ) - return False - - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - return True - - @defer.inlineCallbacks - def update_app_service(self, service): - """Update an application service, clobbering what was previously there. - - Args: - service(ApplicationService): The updated service. - """ - yield self.cache_defer # make sure the cache is ready - - # NB: There is no "insert" since we provide no public-facing API to - # allocate new ASes. It relies on the server admin inserting the AS - # token into the database manually. - - if not service.token or not service.url: - raise StoreError(400, "Token and url must be specified.") - - if not service.hs_token: - raise StoreError(500, "No HS token") - - as_id = yield self.runInteraction( - "update_app_service", - self._update_app_service_txn, - service + self._populate_appservice_cache( + hs.config.app_service_config_files ) - service.id = as_id - # update cache TODO: Should this be in the txn? - for (index, cache_service) in enumerate(self.services_cache): - if service.token == cache_service.token: - self.services_cache[index] = service - logger.info("Updated: %s", service) - return - # new entry - self.services_cache.append(service) - logger.info("Updated(new): %s", service) - - def _update_app_service_txn(self, txn, service): - as_id = self._get_as_id_txn(txn, service.token) - if not as_id: - logger.warning( - "update_app_service_txn: Failed to find as_id for token=", - service.token - ) - return - - txn.execute( - "UPDATE application_services SET url=?, hs_token=?, sender=? " - "WHERE id=?", - (service.url, service.hs_token, service.sender, as_id,) - ) - # cleanup regex - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST): - if ns_str in service.namespaces: - for regex_obj in service.namespaces[ns_str]: - txn.execute( - "INSERT INTO application_services_regex(" - "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, json.dumps(regex_obj)) - ) - return as_id - - def _get_as_id_txn(self, txn, token): - cursor = txn.execute( - "SELECT id FROM application_services WHERE token=?", - (token,) - ) - res = cursor.fetchone() - if res: - return res[0] - - @defer.inlineCallbacks def get_app_services(self): - yield self.cache_defer # make sure the cache is ready - defer.returnValue(self.services_cache) + defer.succeed(self.services_cache) - @defer.inlineCallbacks def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -175,37 +52,24 @@ class ApplicationServiceStore(SQLBaseStore): Returns: synapse.appservice.ApplicationService or None. """ - - yield self.cache_defer # make sure the cache is ready - for service in self.services_cache: if service.sender == user_id: - defer.returnValue(service) + defer.succeed(service) return - defer.returnValue(None) + defer.succeed(None) - @defer.inlineCallbacks - def get_app_service_by_token(self, token, from_cache=True): + def get_app_service_by_token(self, token): """Get the application service with the given appservice token. Args: token (str): The application service token. - from_cache (bool): True to get this service from the cache, False to - check the database. - Raises: - StoreError if there was a problem retrieving this service. + Returns: + synapse.appservice.ApplicationService or None. """ - yield self.cache_defer # make sure the cache is ready - - if from_cache: - for service in self.services_cache: - if service.token == token: - defer.returnValue(service) - return - defer.returnValue(None) - - # TODO: The from_cache=False impl - # TODO: This should be JOINed with the application_services_regex table. + for service in self.services_cache: + if service.token == token: + return defer.succeed(service) + defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -336,18 +200,53 @@ class ApplicationServiceStore(SQLBaseStore): )) return service_list - @defer.inlineCallbacks - def _populate_appservice_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " - "application_services_regex AS r ON a.id = r.as_id") - - results = yield self._execute_and_decode("appservice_cache", sql) - services = self._parse_services_dict(results) + def _load_appservice(self, as_info): + required_string_fields = ["url", "as_token", "hs_token", "sender"] + for field in required_string_fields: + if not isinstance(as_info.get(field), basestring): + raise KeyError("Required string field: '%s'", field) + + # namespace checks + if not isinstance(as_info.get("namespaces"), dict): + raise KeyError("Requires 'namespaces' object.") + for ns in ApplicationService.NS_LIST: + # specific namespaces are optional + if ns in as_info["namespaces"]: + # expect a list of dicts with exclusive and regex keys + for regex_obj in as_info["namespaces"][ns]: + if not isinstance(regex_obj, dict): + raise ValueError( + "Expected namespace entry in %s to be an object," + " but got %s", ns, regex_obj + ) + if not isinstance(regex_obj.get("regex"), basestring): + raise ValueError( + "Missing/bad type 'regex' key in %s", regex_obj + ) + if not isinstance(regex_obj.get("exclusive"), bool): + raise ValueError( + "Missing/bad type 'exclusive' key in %s", regex_obj + ) + return ApplicationService( + token=as_info["as_token"], + url=as_info["url"], + namespaces=as_info["namespaces"], + hs_token=as_info["hs_token"], + sender=as_info["sender"] + ) - for service in services: - logger.info("Found application service: %s", service) - self.services_cache.append(service) + def _populate_appservice_cache(self, config_files): + """Populates a cache of Application Services from the config files.""" + for config_file in config_files: + try: + with open(config_file, 'r') as f: + as_info = yaml.load(f) + appservice = self._load_appservice(as_info) + logger.info("Loaded application service: %s", appservice) + self.services_cache.append(appservice) + except Exception as e: + logger.error("Failed to load appservice from '%s'", config_file) + logger.exception(e) class ApplicationServiceTransactionStore(SQLBaseStore): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index e79599f7fb..82bfea15a6 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -49,45 +49,6 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): # must be done after inserts self.store = ApplicationServiceStore(hs) - @defer.inlineCallbacks - def test_update_and_retrieval_of_service(self): - url = "https://matrix.org/appservices/foobar" - hs_token = "hstok" - user_regex = [ - {"regex": "@foobar_.*:matrix.org", "exclusive": True} - ] - alias_regex = [ - {"regex": "#foobar_.*:matrix.org", "exclusive": False} - ] - room_regex = [ - - ] - service = ApplicationService( - url=url, hs_token=hs_token, token=self.as_token, namespaces={ - ApplicationService.NS_USERS: user_regex, - ApplicationService.NS_ALIASES: alias_regex, - ApplicationService.NS_ROOMS: room_regex - }) - yield self.store.update_app_service(service) - - stored_service = yield self.store.get_app_service_by_token( - self.as_token - ) - self.assertEquals(stored_service.token, self.as_token) - self.assertEquals(stored_service.url, url) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_ALIASES], - alias_regex - ) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_ROOMS], - room_regex - ) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_USERS], - user_regex - ) - @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): service = yield self.store.get_app_service_by_token("invalid_token") -- cgit 1.5.1 From b59aa745560608c8503421bd9542c99fc1c571b5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 11:35:45 +0100 Subject: Fix tests and missing returns on deferreds. --- synapse/appservice/__init__.py | 2 +- synapse/storage/appservice.py | 18 +++++++++++------- tests/storage/test_appservice.py | 41 ++++++++++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 20 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index c60db16b74..a8108c1efb 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -95,7 +95,7 @@ class ApplicationService(object): # rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...], # } if not namespaces: - return None + namespaces = {} for ns in ApplicationService.NS_LIST: if ns not in namespaces: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe9372a7c6..a520a859d3 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): ) def get_app_services(self): - defer.succeed(self.services_cache) + return defer.succeed(self.services_cache) def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -54,9 +54,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.sender == user_id: - defer.succeed(service) - return - defer.succeed(None) + return defer.succeed(service) + return defer.succeed(None) def get_app_service_by_token(self, token): """Get the application service with the given appservice token. @@ -69,7 +68,7 @@ class ApplicationServiceStore(SQLBaseStore): for service in self.services_cache: if service.token == token: return defer.succeed(service) - defer.succeed(None) + return defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -237,11 +236,16 @@ class ApplicationServiceStore(SQLBaseStore): def _populate_appservice_cache(self, config_files): """Populates a cache of Application Services from the config files.""" + if not isinstance(config_files, list): + logger.warning( + "Expected %s to be a list of AS config files.", config_files + ) + return + for config_file in config_files: try: with open(config_file, 'r') as f: - as_info = yaml.load(f) - appservice = self._load_appservice(as_info) + appservice = self._load_appservice(yaml.load(f)) logger.info("Loaded application service: %s", appservice) self.services_cache.append(appservice) except Exception as e: diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 82bfea15a6..b856438fd2 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -22,6 +22,8 @@ from synapse.storage.appservice import ( ) import json +import os +import yaml from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -30,25 +32,40 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + self.as_yaml_files = [] db_pool = SQLiteMemoryDbPool() yield db_pool.prepare() hs = HomeServer( - "test", db_pool=db_pool, clock=MockClock(), config=Mock() + "test", db_pool=db_pool, clock=MockClock(), + config=Mock( + app_service_config_files=self.as_yaml_files + ) ) + self.as_token = "token1" - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", - (self.as_token,) - ) - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", ("token2",) - ) - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", ("token3",) - ) + self.as_url = "some_url" + self._add_appservice(self.as_token, self.as_url, "some_hs_token", "bob") + self._add_appservice("token2", "some_url", "some_hs_token", "bob") + self._add_appservice("token3", "some_url", "some_hs_token", "bob") # must be done after inserts self.store = ApplicationServiceStore(hs) + def tearDown(self): + # TODO: suboptimal that we need to create files for tests! + for f in self.as_yaml_files: + try: + os.remove(f) + except: + pass + + def _add_appservice(self, as_token, url, hs_token, sender): + as_yaml = dict(url=url, as_token=as_token, hs_token=hs_token, + sender=sender, namespaces={}) + # use the token as the filename + with open(as_token, 'w') as outfile: + outfile.write(yaml.dump(as_yaml)) + self.as_yaml_files.append(as_token) + @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): service = yield self.store.get_app_service_by_token("invalid_token") @@ -60,7 +77,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self.as_token ) self.assertEquals(stored_service.token, self.as_token) - self.assertEquals(stored_service.url, None) + self.assertEquals(stored_service.url, self.as_url) self.assertEquals( stored_service.namespaces[ApplicationService.NS_ALIASES], [] -- cgit 1.5.1 From c217504949a90712f41a0422215f923b4d114a17 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 12:07:56 +0100 Subject: Edit SQL schema to use string IDs not ints. Use token as ID. Update tests. --- synapse/storage/appservice.py | 23 ++++++++----- .../storage/schema/delta/15/appservice_txns.sql | 7 ++-- tests/storage/test_appservice.py | 38 +++++++++++++--------- 3 files changed, 40 insertions(+), 28 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a520a859d3..a8780eca1e 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -231,7 +231,8 @@ class ApplicationServiceStore(SQLBaseStore): url=as_info["url"], namespaces=as_info["namespaces"], hs_token=as_info["hs_token"], - sender=as_info["sender"] + sender=as_info["sender"], + id=as_info["as_token"] # the token is the only unique thing here ) def _populate_appservice_cache(self, config_files): @@ -268,16 +269,20 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to a list of ApplicationServices, which may be empty. """ - sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" - " application_services AS a ON a.id=s.as_id LEFT JOIN" - " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" - ) - results = yield self._execute_and_decode( - "get_appservices_by_state", sql, state + results = yield self._simple_select_list( + "application_services_state", + dict(state=state), + ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore - defer.returnValue(self._parse_services_dict(results)) + as_list = yield self.get_app_services() + services = [] + + for res in results: + for service in as_list: + if service.id == res["as_id"]: + services.append(service) + defer.returnValue(services) @defer.inlineCallbacks def get_appservice_state(self, service): diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 13bbb2de2e..2b27e2a429 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -14,14 +14,13 @@ */ CREATE TABLE IF NOT EXISTS application_services_state( - as_id INTEGER PRIMARY KEY, + as_id TEXT PRIMARY KEY, state TEXT, - last_txn TEXT, - FOREIGN KEY(as_id) REFERENCES application_services(id) + last_txn TEXT ); CREATE TABLE IF NOT EXISTS application_services_txns( - as_id INTEGER NOT NULL, + as_id TEXT NOT NULL, txn_id INTEGER NOT NULL, event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index b856438fd2..58551e40b9 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -101,42 +101,48 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + self.as_yaml_files = [] self.db_pool = SQLiteMemoryDbPool() yield self.db_pool.prepare() - hs = HomeServer( - "test", db_pool=self.db_pool, clock=MockClock(), config=Mock() - ) self.as_list = [ { "token": "token1", "url": "https://matrix-as.org", - "id": 3 + "id": "token1" }, { "token": "alpha_tok", "url": "https://alpha.com", - "id": 5 + "id": "alpha_tok" }, { "token": "beta_tok", "url": "https://beta.com", - "id": 6 + "id": "beta_tok" }, { "token": "delta_tok", "url": "https://delta.com", - "id": 7 + "id": "delta_tok" }, ] for s in self.as_list: - yield self._add_service(s["id"], s["url"], s["token"]) - self.store = TestTransactionStore(hs) + yield self._add_service(s["url"], s["token"]) - def _add_service(self, as_id, url, token): - return self.db_pool.runQuery( - "INSERT INTO application_services(id, url, token) VALUES(?,?,?)", - (as_id, url, token) + hs = HomeServer( + "test", db_pool=self.db_pool, clock=MockClock(), config=Mock( + app_service_config_files=self.as_yaml_files + ) ) + self.store = TestTransactionStore(hs) + + def _add_service(self, url, as_token): + as_yaml = dict(url=url, as_token=as_token, hs_token="something", + sender="a_sender", namespaces={}) + # use the token as the filename + with open(as_token, 'w') as outfile: + outfile.write(yaml.dump(as_yaml)) + self.as_yaml_files.append(as_token) def _set_state(self, id, state, txn=None): return self.db_pool.runQuery( @@ -388,8 +394,10 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ApplicationServiceState.DOWN ) self.assertEquals(2, len(services)) - self.assertEquals(self.as_list[2]["id"], services[0].id) - self.assertEquals(self.as_list[0]["id"], services[1].id) + self.assertEquals( + set([self.as_list[2]["id"], self.as_list[0]["id"]]), + set([services[0].id, services[1].id]) + ) # required for ApplicationServiceTransactionStoreTestCase tests -- cgit 1.5.1 From 3470cb36a81052d4968d109f99ecbad210b0c820 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 13:03:31 +0100 Subject: Pyflakes --- synapse/app/homeserver.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 29ca720d5e..afb46d2e23 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -38,8 +38,7 @@ from synapse.http.server_key_resource import LocalKey from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX, - STATIC_PREFIX + SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, STATIC_PREFIX ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -- cgit 1.5.1 From cf1fa59f4b72dbf5c9d735eaf051f1456721d91f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 13:48:03 +0100 Subject: Use a sender localpart instead of a user ID. Form the user ID at runtime instead, This gives less room for error in AS config files since they cannot specify the domain of another HS. --- synapse/storage/appservice.py | 11 +++++++++-- tests/storage/test_appservice.py | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a8780eca1e..557e377ca5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser +from synapse.types import UserID from ._base import SQLBaseStore @@ -31,6 +32,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) + self.hostname = hs.hostname self.services_cache = [] self._populate_appservice_cache( hs.config.app_service_config_files @@ -200,11 +202,16 @@ class ApplicationServiceStore(SQLBaseStore): return service_list def _load_appservice(self, as_info): - required_string_fields = ["url", "as_token", "hs_token", "sender"] + required_string_fields = [ + "url", "as_token", "hs_token", "sender_localpart" + ] for field in required_string_fields: if not isinstance(as_info.get(field), basestring): raise KeyError("Required string field: '%s'", field) + user = UserID(as_info["sender_localpart"], self.hostname) + user_id = user.to_string() + # namespace checks if not isinstance(as_info.get("namespaces"), dict): raise KeyError("Requires 'namespaces' object.") @@ -231,7 +238,7 @@ class ApplicationServiceStore(SQLBaseStore): url=as_info["url"], namespaces=as_info["namespaces"], hs_token=as_info["hs_token"], - sender=as_info["sender"], + sender=user_id, id=as_info["as_token"] # the token is the only unique thing here ) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 58551e40b9..675959c56c 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -60,7 +60,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): def _add_appservice(self, as_token, url, hs_token, sender): as_yaml = dict(url=url, as_token=as_token, hs_token=hs_token, - sender=sender, namespaces={}) + sender_localpart=sender, namespaces={}) # use the token as the filename with open(as_token, 'w') as outfile: outfile.write(yaml.dump(as_yaml)) @@ -138,7 +138,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): def _add_service(self, url, as_token): as_yaml = dict(url=url, as_token=as_token, hs_token="something", - sender="a_sender", namespaces={}) + sender_localpart="a_sender", namespaces={}) # use the token as the filename with open(as_token, 'w') as outfile: outfile.write(yaml.dump(as_yaml)) -- cgit 1.5.1 From 5e88a09a424b8ce65bfe9a809cfd245286474de3 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 14:00:25 +0100 Subject: Add same user_id char checks as registration. --- synapse/storage/appservice.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 557e377ca5..f8cbb3f323 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import urllib import yaml from simplejson import JSONDecodeError import simplejson as json @@ -209,7 +210,12 @@ class ApplicationServiceStore(SQLBaseStore): if not isinstance(as_info.get(field), basestring): raise KeyError("Required string field: '%s'", field) - user = UserID(as_info["sender_localpart"], self.hostname) + localpart = as_info["sender_localpart"] + if urllib.quote(localpart) != localpart: + raise ValueError( + "sender_localpart needs characters which are not URL encoded." + ) + user = UserID(localpart, self.hostname) user_id = user.to_string() # namespace checks -- cgit 1.5.1 From 09cbff174a01757d10107b7960972a484153323e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 16:43:49 +0100 Subject: Fix thinko whereby events *for the AS specifically* were not passed on. This was caused by not explicitly checking the service.sender field. This has now been fixed and a regression test has been added. --- synapse/appservice/__init__.py | 5 ++++- tests/appservice/test_appservice.py | 13 +++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index c60db16b74..4a6cdbc2be 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -199,7 +199,10 @@ class ApplicationService(object): return self._matches_user(event, member_list) def is_interested_in_user(self, user_id): - return self._matches_regex(user_id, ApplicationService.NS_USERS) + return ( + self._matches_regex(user_id, ApplicationService.NS_USERS) + or user_id == self.sender + ) def is_interested_in_alias(self, alias): return self._matches_regex(alias, ApplicationService.NS_ALIASES) diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index eb7becf725..62149d6902 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -199,6 +199,19 @@ class ApplicationServiceTestCase(unittest.TestCase): aliases_for_event=["#xmpp_barfoo:matrix.org"] )) + def test_interested_in_self(self): + # make sure invites get through + self.service.sender = "@appservice:name" + self.service.namespaces[ApplicationService.NS_USERS].append( + _regex("@irc_.*") + ) + self.event.type = "m.room.member" + self.event.content = { + "membership": "invite" + } + self.event.state_key = self.service.sender + self.assertTrue(self.service.is_interested(self.event)) + def test_member_list_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") -- cgit 1.5.1 From 813e54bd5b332e4514ecfea71d33d27f106fe5ff Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 1 Apr 2015 14:05:24 +0100 Subject: Fix more AS sender ID thinkos. Specifically, the ASes own user ID wasn't being treated as 'exclusive' so a human could nab it. Also, the HS would needlessly send user queries to the AS for its own user ID. --- synapse/appservice/__init__.py | 5 ++++- synapse/handlers/appservice.py | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index ab0a6955f0..63a18b802b 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -211,7 +211,10 @@ class ApplicationService(object): return self._matches_regex(room_id, ApplicationService.NS_ROOMS) def is_exclusive_user(self, user_id): - return self._is_exclusive(ApplicationService.NS_USERS, user_id) + return ( + self._is_exclusive(ApplicationService.NS_USERS, user_id) + or user_id == self.sender + ) def is_exclusive_alias(self, alias): return self._is_exclusive(ApplicationService.NS_ALIASES, alias) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 59cf15b037..492a630fdc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -180,7 +180,14 @@ class ApplicationServicesHandler(object): return user_info = yield self.store.get_user_by_id(user_id) - defer.returnValue(len(user_info) == 0) + if len(user_info) > 0: + defer.returnValue(False) + return + + # user not found; could be the AS though, so check. + services = yield self.store.get_app_services() + service_list = [s for s in services if s.sender == user_id] + defer.returnValue(len(service_list) == 0) @defer.inlineCallbacks def _check_user_exists(self, user_id): -- cgit 1.5.1 From 5583e29513f1f67012b98b430670d645928d4195 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 1 Apr 2015 19:04:55 +0100 Subject: Report process open filehandles in metrics --- synapse/metrics/__init__.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) (limited to 'synapse') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index dffb8a4861..6564b03eee 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,6 +18,8 @@ from __future__ import absolute_import import logging from resource import getrusage, getpagesize, RUSAGE_SELF +import os +import stat from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric @@ -109,3 +111,35 @@ resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000) # pages resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE) + +TYPES = { + stat.S_IFSOCK: "SOCK", + stat.S_IFLNK: "LNK", + stat.S_IFREG: "REG", + stat.S_IFBLK: "BLK", + stat.S_IFDIR: "DIR", + stat.S_IFCHR: "CHR", + stat.S_IFIFO: "FIFO", +} + +def _process_fds(): + counts = {(k,): 0 for k in TYPES.values()} + counts[("other",)] = 0 + + for fd in os.listdir("/proc/self/fd"): + try: + s = os.stat("/proc/self/fd/%s" % (fd)) + fmt = stat.S_IFMT(s.st_mode) + if fmt in TYPES: + t = TYPES[fmt] + else: + t = "other" + + counts[(t,)] += 1 + except OSError: + # the dirh itself used by listdir() is usually missing by now + pass + + return counts + +get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"]) -- cgit 1.5.1 From ef1e019840ee8ba17b45754ef223a710ce23553c Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 1 Apr 2015 19:17:38 +0100 Subject: Appease pep8 --- synapse/metrics/__init__.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 6564b03eee..9233ea3da9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -122,6 +122,7 @@ TYPES = { stat.S_IFIFO: "FIFO", } + def _process_fds(): counts = {(k,): 0 for k in TYPES.values()} counts[("other",)] = 0 -- cgit 1.5.1 From ae8ff92e05eb511b21206ec303056c36e00df61c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 7 Apr 2015 15:48:20 +0100 Subject: Fix a bug which causes a send event level of 0 to not be honoured. Caused by a bad if check, which incorrectly executes for both 0 and None, when None was the original intent. --- synapse/api/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 64f605b962..18f3d117b3 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -486,7 +486,7 @@ class Auth(object): send_level = send_level_event.content.get("events", {}).get( event.type ) - if not send_level: + if send_level is None: if hasattr(event, "state_key"): send_level = send_level_event.content.get( "state_default", 50 -- cgit 1.5.1 From 0775c624698dbe5a837280e729ec488fd0dda28e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Apr 2015 18:16:23 +0100 Subject: Fix --enable-registration flag to work if you don't give a value --- demo/start.sh | 4 ++-- synapse/config/registration.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/demo/start.sh b/demo/start.sh index d647400d39..0485be8053 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -33,8 +33,8 @@ for port in 8080 8081 8082; do --manhole $((port + 1000)) \ --tls-dh-params-path "demo/demo.tls.dh" \ --media-store-path "demo/media_store.$port" \ - $PARAMS $SYNAPSE_PARAMS \ - --enable-registration + $PARAMS $SYNAPSE_PARAMS \ + --enable-registration python -m synapse.app.homeserver \ --config-path "demo/etc/$port.config" \ diff --git a/synapse/config/registration.py b/synapse/config/registration.py index a6a2d2c5e1..d5c8f4bf7b 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -40,7 +40,7 @@ class RegistrationConfig(Config): reg_group.add_argument( "--enable-registration", - const=False, + const=True, default=False, nargs='?', help="Enable registration for new users.", -- cgit 1.5.1 From c1b34af441e06e8efd775a47410de784ae9c94ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:12:38 +0100 Subject: Move database timer logging to seperate logger --- synapse/storage/_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6fa63f052e..53eee10d51 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -35,6 +35,7 @@ logger = logging.getLogger(__name__) sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +perf_logger = logging.getLogger("synapse.storage.TIME") metrics = synapse.metrics.get_metrics_for("synapse.storage") @@ -268,7 +269,7 @@ class SQLBaseStore(object): time_now - time_then, limit=3 ) - logger.info( + perf_logger.info( "Total database time: %.3f%% {%s} {%s}", ratio * 100, top_three_counters, top_3_event_counters ) -- cgit 1.5.1 From 07d404170934db8bc3aa3ae8ac89ceb25cd2e9a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:27:36 +0100 Subject: Fix bug where we didn't inform the NotificataionListeners about new rooms they have been subscribed to. This meant that the listeners didn't clean themselves up fully from all the dicts --- synapse/notifier.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index 7121d659d0..b12b54353e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -427,3 +427,6 @@ class Notifier(object): listeners = self.room_to_listeners.setdefault(room_id, set()) listeners |= new_listeners + + for l in new_listeners: + l.rooms.add(room_id) -- cgit 1.5.1 From 65f5e4e3e43ee471ee0a8c6989bbf60cb3be2c95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:31:06 +0100 Subject: Add paranoia checks to make sure that we evict stale NotificationListeners when we are about to process them --- synapse/notifier.py | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index b12b54353e..ce9b0d2187 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -62,7 +62,8 @@ class _NotificationListener(object): self.rooms = rooms - self.pending_notifications = [] + def notified(self): + return self.deferred.called def notify(self, notifier, events, start_token, end_token): """ Inform whoever is listening about the new events. This will @@ -78,11 +79,15 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass + # Should the following be done be using intrusively linked lists? + # -- erikj + for room in self.rooms: lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: notifier.appservice_to_listeners.get( self.appservice, set() @@ -161,10 +166,24 @@ class Notifier(object): room_source = self.event_sources.sources["room"] - listeners = self.room_to_listeners.get(room_id, set()).copy() + room_listeners = self.room_to_listeners.get(room_id, set()) + + # Remove any 'stale' listeners. + for l in room_listeners.copy(): + if l.notified(): + room_listeners.discard(l) + + listeners = room_listeners.copy() for user in extra_users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + # Remove any 'stale' listeners. + for l in user_listeners.copy(): + if l.notified(): + user_listeners.discard(l) + + listeners |= user_listeners for appservice in self.appservice_to_listeners: # TODO (kegan): Redundant appservice listener checks? @@ -173,9 +192,16 @@ class Notifier(object): # receive *invites* for users they are interested in. Does this # make the room_to_listeners check somewhat obselete? if appservice.is_interested(event): - listeners |= self.appservice_to_listeners.get( + app_listeners = self.appservice_to_listeners.get( appservice, set() - ).copy() + ) + + # Remove any 'stale' listeners. + for l in app_listeners.copy(): + if l.notified(): + app_listeners.discard(l) + + listeners |= app_listeners logger.debug("on_new_room_event listeners %s", listeners) -- cgit 1.5.1 From 830d07db8278d773338fee94eb269eafd6b1b7fc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:40:20 +0100 Subject: Also perform paranoia checks in 'on_new_user_event' --- synapse/notifier.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index ce9b0d2187..be78082021 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -252,10 +252,24 @@ class Notifier(object): listeners = set() for user in users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + # Remove any 'stale' listeners. + for l in user_listeners.copy(): + if l.notified(): + user_listeners.discard(l) + + listeners |= user_listeners for room in rooms: - listeners |= self.room_to_listeners.get(room, set()).copy() + room_listeners = self.room_to_listeners.get(room, set()) + + # Remove any 'stale' listeners. + for l in room_listeners.copy(): + if l.notified(): + room_listeners.discard(l) + + listeners |= room_listeners @defer.inlineCallbacks def notify(listener): -- cgit 1.5.1 From 638be5a6b971bf961ee030d96245f296eb83e612 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:58:32 +0100 Subject: Factor out loops into '_discard_if_notified' --- synapse/notifier.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index be78082021..754569ebd2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -169,9 +169,7 @@ class Notifier(object): room_listeners = self.room_to_listeners.get(room_id, set()) # Remove any 'stale' listeners. - for l in room_listeners.copy(): - if l.notified(): - room_listeners.discard(l) + _discard_if_notified(room_listeners) listeners = room_listeners.copy() @@ -179,9 +177,7 @@ class Notifier(object): user_listeners = self.user_to_listeners.get(user, set()) # Remove any 'stale' listeners. - for l in user_listeners.copy(): - if l.notified(): - user_listeners.discard(l) + _discard_if_notified(user_listeners) listeners |= user_listeners @@ -197,9 +193,7 @@ class Notifier(object): ) # Remove any 'stale' listeners. - for l in app_listeners.copy(): - if l.notified(): - app_listeners.discard(l) + _discard_if_notified(app_listeners) listeners |= app_listeners @@ -255,9 +249,7 @@ class Notifier(object): user_listeners = self.user_to_listeners.get(user, set()) # Remove any 'stale' listeners. - for l in user_listeners.copy(): - if l.notified(): - user_listeners.discard(l) + _discard_if_notified(user_listeners) listeners |= user_listeners @@ -265,9 +257,7 @@ class Notifier(object): room_listeners = self.room_to_listeners.get(room, set()) # Remove any 'stale' listeners. - for l in room_listeners.copy(): - if l.notified(): - room_listeners.discard(l) + _discard_if_notified(room_listeners) listeners |= room_listeners @@ -470,3 +460,12 @@ class Notifier(object): for l in new_listeners: l.rooms.add(room_id) + + +def _discard_if_notified(listener_set): + to_discard = set() + for l in listener_set: + if l.notified(): + to_discard.add(l) + + listener_set -= to_discard -- cgit 1.5.1 From 5bc41fe9f8d40ecf4070c7ffb8df635dcccb4efe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 14:01:22 +0100 Subject: Move comment into docstring --- synapse/notifier.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index 754569ebd2..12573f3f59 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -168,7 +168,6 @@ class Notifier(object): room_listeners = self.room_to_listeners.get(room_id, set()) - # Remove any 'stale' listeners. _discard_if_notified(room_listeners) listeners = room_listeners.copy() @@ -176,7 +175,6 @@ class Notifier(object): for user in extra_users: user_listeners = self.user_to_listeners.get(user, set()) - # Remove any 'stale' listeners. _discard_if_notified(user_listeners) listeners |= user_listeners @@ -192,7 +190,6 @@ class Notifier(object): appservice, set() ) - # Remove any 'stale' listeners. _discard_if_notified(app_listeners) listeners |= app_listeners @@ -248,7 +245,6 @@ class Notifier(object): for user in users: user_listeners = self.user_to_listeners.get(user, set()) - # Remove any 'stale' listeners. _discard_if_notified(user_listeners) listeners |= user_listeners @@ -256,7 +252,6 @@ class Notifier(object): for room in rooms: room_listeners = self.room_to_listeners.get(room, set()) - # Remove any 'stale' listeners. _discard_if_notified(room_listeners) listeners |= room_listeners @@ -463,6 +458,8 @@ class Notifier(object): def _discard_if_notified(listener_set): + """Remove any 'stale' listeners from the given set. + """ to_discard = set() for l in listener_set: if l.notified(): -- cgit 1.5.1 From e8f152160591ccdfd93dd378a57d2322159cecc6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 14:08:30 +0100 Subject: Don't yield on notifying all listeners --- synapse/handlers/federation.py | 40 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 15ba417e06..ccbbf540f4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -201,10 +201,18 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( event, extra_users=extra_users ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) + if event.type == EventTypes.Member: if event.membership == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -427,10 +435,18 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( new_event, extra_users=[joinee] ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) + logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -500,10 +516,18 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( event, extra_users=extra_users ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) + if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -574,10 +598,18 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( event, extra_users=[target_user], ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) + defer.returnValue(event) @defer.inlineCallbacks -- cgit 1.5.1 From 19234cc6c371ab073e18dac1024d5c8cf101d410 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 14:10:06 +0100 Subject: typo --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ccbbf540f4..8aceac28cf 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -442,7 +442,7 @@ class FederationHandler(BaseHandler): def log_failure(f): logger.warn( "Failed to notify about %s: %s", - event.event_id, f.value + new_event.event_id, f.value ) d.addErrback(log_failure) -- cgit 1.5.1 From ccda401dbf6a9f55818dfd48a6cb9f7b8a2662fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 16:34:23 +0100 Subject: SYN-338: Fix typo that caused the cache to throw an exception in some instances --- synapse/util/expiringcache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/util/expiringcache.py b/synapse/util/expiringcache.py index 1c7859297a..06d1eea01b 100644 --- a/synapse/util/expiringcache.py +++ b/synapse/util/expiringcache.py @@ -65,7 +65,7 @@ class ExpiringCache(object): if self._max_len and len(self._cache.keys()) > self._max_len: sorted_entries = sorted( self._cache.items(), - key=lambda k, v: v.time, + key=lambda (k, v): v.time, ) for k, _ in sorted_entries[self._max_len:]: -- cgit 1.5.1 From 45131b2bca72ca1e9729c99de53e29c498d31138 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 16:35:12 +0100 Subject: Bump version --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/__init__.py b/synapse/__init__.py index 749a60329c..fd87c7e2d0 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.8.1-r2" +__version__ = "0.8.1-r3" -- cgit 1.5.1 From 6f9dea7483ed01d17522857c5b103971a0050d8f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:07:20 +0100 Subject: SYN-339: Cancel the notifier timeout when the notifier fires --- synapse/notifier.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index 12573f3f59..0fa77d28ca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -59,8 +59,8 @@ class _NotificationListener(object): self.limit = limit self.timeout = timeout self.deferred = deferred - self.rooms = rooms + self.timer = None def notified(self): return self.deferred.called @@ -93,6 +93,13 @@ class _NotificationListener(object): self.appservice, set() ).discard(self) + # Cancel the timeout for this notifer if one exists. + if self.timer is not None: + try: + notifier.clock.cancel_call_later(self.timer) + except: + logger.exception("Failed to cancel notifier timer") + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -325,14 +332,20 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + timer = [None] + if timeout: timed_out = [False] def _timeout_listener(): timed_out[0] = True + timer[0] = None listener[0].notify(self, [], from_token, from_token) - self.clock.call_later(timeout/1000., _timeout_listener) + # We create multiple notification listeners so we have to manage + # canceling the timeout ourselves. + timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) + while not result and not timed_out[0]: yield deferred deferred = defer.Deferred() @@ -347,6 +360,12 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + if timer[0] is not None: + try: + self.clock.cancel_call_later(timer[0]) + except: + logger.exception("Failed to cancel notifer timer") + defer.returnValue(result) def get_events_for(self, user, rooms, pagination_config, timeout): @@ -400,8 +419,11 @@ class Notifier(object): if not timeout: _timeout_listener() else: - self.clock.call_later(timeout/1000.0, _timeout_listener) - + # Only add the timer if the listener hasn't been notified + if not listener.notified(): + listener.timer = self.clock.call_later( + timeout/1000.0, _timeout_listener + ) return @log_function -- cgit 1.5.1 From 23d285ad57ca76e8ff2d33f1f6e476930689d9a7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:41:50 +0100 Subject: Unset the timer in the timeout callback so that we don't try to cancel it if it has been called --- synapse/notifier.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index 0fa77d28ca..e6f37c3736 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -404,6 +404,7 @@ class Notifier(object): def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. + listener.timer = None listener.notify( self, [], -- cgit 1.5.1 From 1280a47fc671b718239e06030d469d99aa5ea513 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:42:21 +0100 Subject: Add comment --- synapse/notifier.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index e6f37c3736..d750a6fcf7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -404,6 +404,7 @@ class Notifier(object): def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. + # Remove the timer from the listener so we don't try to cancel it. listener.timer = None listener.notify( self, -- cgit 1.5.1 From 3cbc286d062977e192a10525040a5e713d4c97e0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 14 Apr 2015 13:28:11 +0100 Subject: Move server key api into rest/key/v1 --- synapse/app/homeserver.py | 2 +- synapse/http/server_key_resource.py | 93 ------------------------------ synapse/rest/key/__init__.py | 14 +++++ synapse/rest/key/v1/__init__.py | 14 +++++ synapse/rest/key/v1/server_key_resource.py | 93 ++++++++++++++++++++++++++++++ 5 files changed, 122 insertions(+), 94 deletions(-) delete mode 100644 synapse/http/server_key_resource.py create mode 100644 synapse/rest/key/__init__.py create mode 100644 synapse/rest/key/v1/__init__.py create mode 100644 synapse/rest/key/v1/server_key_resource.py (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index afb46d2e23..27e53a9e56 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -34,7 +34,7 @@ from twisted.web.server import Site from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource -from synapse.http.server_key_resource import LocalKey +from synapse.rest.key.v1.server_key_resource import LocalKey from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, diff --git a/synapse/http/server_key_resource.py b/synapse/http/server_key_resource.py deleted file mode 100644 index 71e9a51f5c..0000000000 --- a/synapse/http/server_key_resource.py +++ /dev/null @@ -1,93 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.web.resource import Resource -from synapse.http.server import respond_with_json_bytes -from syutil.crypto.jsonsign import sign_json -from syutil.base64util import encode_base64 -from syutil.jsonutil import encode_canonical_json -from OpenSSL import crypto -import logging - - -logger = logging.getLogger(__name__) - - -class LocalKey(Resource): - """HTTP resource containing encoding the TLS X.509 certificate and NACL - signature verification keys for this server:: - - GET /key HTTP/1.1 - - HTTP/1.1 200 OK - Content-Type: application/json - { - "server_name": "this.server.example.com" - "verify_keys": { - "algorithm:version": # base64 encoded NACL verification key. - }, - "tls_certificate": # base64 ASN.1 DER encoded X.509 tls cert. - "signatures": { - "this.server.example.com": { - "algorithm:version": # NACL signature for this server. - } - } - } - """ - - def __init__(self, hs): - self.hs = hs - self.version_string = hs.version_string - self.response_body = encode_canonical_json( - self.response_json_object(hs.config) - ) - Resource.__init__(self) - - @staticmethod - def response_json_object(server_config): - verify_keys = {} - for key in server_config.signing_key: - verify_key_bytes = key.verify_key.encode() - key_id = "%s:%s" % (key.alg, key.version) - verify_keys[key_id] = encode_base64(verify_key_bytes) - - x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, - server_config.tls_certificate - ) - json_object = { - u"server_name": server_config.server_name, - u"verify_keys": verify_keys, - u"tls_certificate": encode_base64(x509_certificate_bytes) - } - for key in server_config.signing_key: - json_object = sign_json( - json_object, - server_config.server_name, - key, - ) - - return json_object - - def render_GET(self, request): - return respond_with_json_bytes( - request, 200, self.response_body, - version_string=self.version_string - ) - - def getChild(self, name, request): - if name == '': - return self diff --git a/synapse/rest/key/__init__.py b/synapse/rest/key/__init__.py new file mode 100644 index 0000000000..1a84d94cd9 --- /dev/null +++ b/synapse/rest/key/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/rest/key/v1/__init__.py b/synapse/rest/key/v1/__init__.py new file mode 100644 index 0000000000..1a84d94cd9 --- /dev/null +++ b/synapse/rest/key/v1/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/rest/key/v1/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py new file mode 100644 index 0000000000..71e9a51f5c --- /dev/null +++ b/synapse/rest/key/v1/server_key_resource.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.web.resource import Resource +from synapse.http.server import respond_with_json_bytes +from syutil.crypto.jsonsign import sign_json +from syutil.base64util import encode_base64 +from syutil.jsonutil import encode_canonical_json +from OpenSSL import crypto +import logging + + +logger = logging.getLogger(__name__) + + +class LocalKey(Resource): + """HTTP resource containing encoding the TLS X.509 certificate and NACL + signature verification keys for this server:: + + GET /key HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + { + "server_name": "this.server.example.com" + "verify_keys": { + "algorithm:version": # base64 encoded NACL verification key. + }, + "tls_certificate": # base64 ASN.1 DER encoded X.509 tls cert. + "signatures": { + "this.server.example.com": { + "algorithm:version": # NACL signature for this server. + } + } + } + """ + + def __init__(self, hs): + self.hs = hs + self.version_string = hs.version_string + self.response_body = encode_canonical_json( + self.response_json_object(hs.config) + ) + Resource.__init__(self) + + @staticmethod + def response_json_object(server_config): + verify_keys = {} + for key in server_config.signing_key: + verify_key_bytes = key.verify_key.encode() + key_id = "%s:%s" % (key.alg, key.version) + verify_keys[key_id] = encode_base64(verify_key_bytes) + + x509_certificate_bytes = crypto.dump_certificate( + crypto.FILETYPE_ASN1, + server_config.tls_certificate + ) + json_object = { + u"server_name": server_config.server_name, + u"verify_keys": verify_keys, + u"tls_certificate": encode_base64(x509_certificate_bytes) + } + for key in server_config.signing_key: + json_object = sign_json( + json_object, + server_config.server_name, + key, + ) + + return json_object + + def render_GET(self, request): + return respond_with_json_bytes( + request, 200, self.response_body, + version_string=self.version_string + ) + + def getChild(self, name, request): + if name == '': + return self -- cgit 1.5.1 From 88cb06e996349d9a2e69d5f29dafe764d35b7966 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 14 Apr 2015 16:18:17 +0100 Subject: Update syutil version to 0.0.4 --- synapse/config/server.py | 2 +- synapse/python_dependencies.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/config/server.py b/synapse/config/server.py index 58a828cc4c..d4c223f348 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -110,7 +110,7 @@ class ServerConfig(Config): with open(args.signing_key_path, "w") as signing_key_file: syutil.crypto.signing_key.write_signing_keys( signing_key_file, - (syutil.crypto.signing_key.generate_singing_key("auto"),), + (syutil.crypto.signing_key.generate_signing_key("auto"),), ) else: signing_keys = cls.read_file(args.signing_key_path, "signing_key") diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 6b6d5508b8..dac927d0a7 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -4,7 +4,7 @@ from distutils.version import LooseVersion logger = logging.getLogger(__name__) REQUIREMENTS = { - "syutil>=0.0.3": ["syutil"], + "syutil>=0.0.4": ["syutil"], "Twisted==14.0.2": ["twisted==14.0.2"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], "pyopenssl>=0.14": ["OpenSSL>=0.14"], @@ -43,8 +43,8 @@ DEPENDENCY_LINKS = [ ), github_link( project="matrix-org/syutil", - version="v0.0.3", - egg="syutil-0.0.3", + version="v0.0.4", + egg="syutil-0.0.4", ), github_link( project="matrix-org/matrix-angular-sdk", -- cgit 1.5.1 From e6e130b9ba702873d1fdf8788abf718e38e64419 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 15 Apr 2015 18:07:33 +0100 Subject: Ensure that non-room-members cannot ban others, even if they do have enough powerlevel (SYN-343) --- synapse/api/auth.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 18f3d117b3..97801631f5 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -272,6 +272,11 @@ class Auth(object): 403, "You cannot kick user %s." % target_user_id ) elif Membership.BAN == membership: + if not caller_in_room: # caller isn't joined + raise AuthError( + 403, + "%s not in room %s." % (event.user_id, event.room_id,) + ) if user_level < ban_level: raise AuthError(403, "You don't have permission to ban") else: -- cgit 1.5.1 From 399b5add58da4104141500a3bb49cc35dd754563 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 15 Apr 2015 18:40:23 +0100 Subject: Neater implementation of membership change auth checks, ensuring we can't forget to check if the calling user is a member of the room --- synapse/api/auth.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 97801631f5..e159e4503f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -215,17 +215,20 @@ class Auth(object): else: ban_level = 50 # FIXME (erikj): What should we do here? - if Membership.INVITE == membership: - # TODO (erikj): We should probably handle this more intelligently - # PRIVATE join rules. - - # Invites are valid iff caller is in the room and target isn't. + if Membership.JOIN != membership: + # JOIN is the only action you can perform if you're not in the room if not caller_in_room: # caller isn't joined raise AuthError( 403, "%s not in room %s." % (event.user_id, event.room_id,) ) - elif target_banned: + + if Membership.INVITE == membership: + # TODO (erikj): We should probably handle this more intelligently + # PRIVATE join rules. + + # Invites are valid iff caller is in the room and target isn't. + if target_banned: raise AuthError( 403, "%s is banned from the room" % (target_user_id,) ) @@ -251,13 +254,7 @@ class Auth(object): raise AuthError(403, "You are not allowed to join this room") elif Membership.LEAVE == membership: # TODO (erikj): Implement kicks. - - if not caller_in_room: # trying to leave a room you aren't joined - raise AuthError( - 403, - "%s not in room %s." % (target_user_id, event.room_id,) - ) - elif target_banned and user_level < ban_level: + if target_banned and user_level < ban_level: raise AuthError( 403, "You cannot unban user &s." % (target_user_id,) ) @@ -272,11 +269,6 @@ class Auth(object): 403, "You cannot kick user %s." % target_user_id ) elif Membership.BAN == membership: - if not caller_in_room: # caller isn't joined - raise AuthError( - 403, - "%s not in room %s." % (event.user_id, event.room_id,) - ) if user_level < ban_level: raise AuthError(403, "You don't have permission to ban") else: -- cgit 1.5.1 From 0268d40281313c9a89e7b4356ae2e5f77a622857 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 15 Apr 2015 23:09:35 +0100 Subject: Have TypingNotificationEventSource.get_new_events_for_user() return a deferred, for consistency and extensibility --- synapse/handlers/typing.py | 2 +- tests/handlers/test_typing.py | 18 ++++++++++++------ tests/rest/client/v1/test_typing.py | 3 ++- 3 files changed, 15 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c2762f92c7..05879fbfc6 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -252,7 +252,7 @@ class TypingNotificationEventSource(object): # TODO: check if user is in room events.append(self._make_event_for(room_id)) - return (events, handler._latest_room_serial) + return defer.succeed((events, handler._latest_room_serial)) def get_current_key(self): return self.handler()._latest_room_serial diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index bf34b7ccbd..39590115e0 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -175,8 +175,9 @@ class TypingNotificationsTestCase(unittest.TestCase): ]) self.assertEquals(self.event_source.get_current_key(), 1) + events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) self.assertEquals( - self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + events[0], [ {"type": "m.typing", "room_id": self.room_id, @@ -237,8 +238,9 @@ class TypingNotificationsTestCase(unittest.TestCase): ]) self.assertEquals(self.event_source.get_current_key(), 1) + events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) self.assertEquals( - self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + events[0], [ {"type": "m.typing", "room_id": self.room_id, @@ -292,8 +294,9 @@ class TypingNotificationsTestCase(unittest.TestCase): yield put_json.await_calls() self.assertEquals(self.event_source.get_current_key(), 1) + events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) self.assertEquals( - self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + events[0], [ {"type": "m.typing", "room_id": self.room_id, @@ -322,8 +325,9 @@ class TypingNotificationsTestCase(unittest.TestCase): self.on_new_user_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 1) + events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) self.assertEquals( - self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + events[0], [ {"type": "m.typing", "room_id": self.room_id, @@ -340,8 +344,9 @@ class TypingNotificationsTestCase(unittest.TestCase): ]) self.assertEquals(self.event_source.get_current_key(), 2) + events = yield self.event_source.get_new_events_for_user(self.u_apple, 1, None) self.assertEquals( - self.event_source.get_new_events_for_user(self.u_apple, 1, None)[0], + events[0], [ {"type": "m.typing", "room_id": self.room_id, @@ -366,8 +371,9 @@ class TypingNotificationsTestCase(unittest.TestCase): self.on_new_user_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 3) + events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) self.assertEquals( - self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + events[0], [ {"type": "m.typing", "room_id": self.room_id, diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 80f2ec9ddf..d04e5abda4 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -115,8 +115,9 @@ class RoomTypingTestCase(RestTestCase): self.assertEquals(200, code) self.assertEquals(self.event_source.get_current_key(), 1) + events = yield self.event_source.get_new_events_for_user(self.user_id, 0, None) self.assertEquals( - self.event_source.get_new_events_for_user(self.user_id, 0, None)[0], + events[0], [ {"type": "m.typing", "room_id": self.room_id, -- cgit 1.5.1 From f2cf37518b2ad663fb8fb721258fc4fffed8f5b2 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 15 Apr 2015 23:34:16 +0100 Subject: Filter typing nofication events to only those rooms the requesting user is a member of (SYN-328) --- synapse/handlers/typing.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 05879fbfc6..c0b2bd7db0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -223,6 +223,7 @@ class TypingNotificationEventSource(object): def __init__(self, hs): self.hs = hs self._handler = None + self._room_member_handler = None def handler(self): # Avoid cyclic dependency in handler setup @@ -230,6 +231,11 @@ class TypingNotificationEventSource(object): self._handler = self.hs.get_handlers().typing_notification_handler return self._handler + def room_member_handler(self): + if not self._room_member_handler: + self._room_member_handler = self.hs.get_handlers().room_member_handler + return self._room_member_handler + def _make_event_for(self, room_id): typing = self.handler()._room_typing[room_id] return { @@ -240,19 +246,25 @@ class TypingNotificationEventSource(object): }, } + @defer.inlineCallbacks def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) handler = self.handler() + joined_room_ids = ( + yield self.room_member_handler().get_joined_rooms_for_user(user) + ) + events = [] for room_id in handler._room_serials: + if room_id not in joined_room_ids: + continue if handler._room_serials[room_id] <= from_key: continue - # TODO: check if user is in room events.append(self._make_event_for(room_id)) - return defer.succeed((events, handler._latest_room_serial)) + defer.returnValue((events, handler._latest_room_serial)) def get_current_key(self): return self.handler()._latest_room_serial -- cgit 1.5.1