From be09c23ff02bee9c63611df528a269fb157f2f3c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 15:40:07 +0000 Subject: Add txn_id kwarg to push methods --- synapse/appservice/api.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c2179f8d55..c17fb219c5 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -72,11 +72,16 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push_bulk(self, service, events): + def push_bulk(self, service, events, txn_id=None): events = self._serialize(events) + if txn_id is None: + logger.warning("push_bulk: Missing txn ID sending events to %s", + service.url) + txn_id = str(0) + uri = service.url + ("/transactions/%s" % - urllib.quote(str(0))) # TODO txn_ids + urllib.quote(txn_id)) response = None try: response = yield self.put_json( @@ -97,8 +102,8 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push(self, service, event): - response = yield self.push_bulk(service, [event]) + def push(self, service, event, txn_id=None): + response = yield self.push_bulk(service, [event], txn_id) defer.returnValue(response) def _serialize(self, events): -- cgit 1.5.1 From e3190711911f166c5599acb66929f222498b212a Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 16:30:33 +0000 Subject: Add stub scheduler module for txn reliability --- synapse/appservice/scheduler.py | 68 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 synapse/appservice/scheduler.py (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py new file mode 100644 index 0000000000..a5060808d3 --- /dev/null +++ b/synapse/appservice/scheduler.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +This module controls the reliability for application service transactions. + +The nominal flow through this module looks like: + ___________ + \O/ --- event -->| | +--------------+ + | - event ---->| EventPool |<-- poll 1/s for events ---| EventSorter | + / \ ---- event ->|___________| +--------------+ + USERS ____________________________| + | | | + V V V + ASa ASb ASc + [e,e] [e] [e,e,e] + | + V + -````````- +------------+ + |````````|<--StoreTxn-|Transaction | + |Database| | Maker |---> SEND TO AS + `--------` +------------+ +What happens on SEND TO AS depends on the state of the Application Service: + - If the AS is marked as DOWN, do nothing. + - If the AS is marked as UP, send the transaction. + * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn + contents from the db. + * FAILURE : Marked AS as DOWN and start Recoverer. + +Recoverer attempts to recover ASes who have died. The flow for this looks like: + ,--------------------- backoff++ --------------. + V | + START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE + backoff DB and try to send it + ^ |__________ +Mark AS as | V +UP & quit +---------- YES SUCCESS + | | | + NO <--- Have more txns? <------ Mark txn success & nuke -+ + from db; incr AS pos. +""" + + +class EventPool(object): + pass + + +class EventSorter(object): + pass + + +class TransactionMaker(object): + pass + + +class Recoverer(object): + pass -- cgit 1.5.1 From 773cb3b6880851f318147b59f16c2c882d280a6e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 17:35:07 +0000 Subject: Add stub architecture for txn reliability. --- synapse/appservice/scheduler.py | 121 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 10 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index a5060808d3..3162fbec11 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -18,7 +18,7 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: ___________ \O/ --- event -->| | +--------------+ - | - event ---->| EventPool |<-- poll 1/s for events ---| EventSorter | + | - event ---->| event_pool|<-- poll 1/s for events ---| EventSorter | / \ ---- event ->|___________| +--------------+ USERS ____________________________| | | | @@ -29,7 +29,7 @@ The nominal flow through this module looks like: V -````````- +------------+ |````````|<--StoreTxn-|Transaction | - |Database| | Maker |---> SEND TO AS + |Database| | Controller |---> SEND TO AS `--------` +------------+ What happens on SEND TO AS depends on the state of the Application Service: - If the AS is marked as DOWN, do nothing. @@ -49,20 +49,121 @@ UP & quit +---------- YES SUCCESS | | | NO <--- Have more txns? <------ Mark txn success & nuke -+ from db; incr AS pos. + +This is all tied together by the AppServiceScheduler which DIs the required +components. """ -class EventPool(object): - pass +class AppServiceScheduler(object): + """ Public facing API for this module. Does the required DI to tie the + components together. This also serves as the "event_pool", which in this + case is a simple array. + """ + + def __init__(self, store, as_api, services): + self.app_services = services + self.event_pool = [] + + def create_recoverer(service): + return _Recoverer(store, as_api, service) + self.txn_ctrl = _TransactionController(store, as_api, create_recoverer) + + self.event_sorter = _EventSorter(self, self.txn_ctrl, services) + + def start(self): + self.event_sorter.start_polling() + + def store_event(self, event): # event_pool + self.event_pool.append(event) + + def get_events(self): # event_pool + return self.event_pool + + +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + # sends this transaction using this as_api + pass + + def complete(self, store): + # increment txn id on AS and nuke txn contents from db + pass + + +class _EventSorter(object): + + def __init__(self, event_pool, txn_ctrl, services): + self.event_pool = event_pool + self.txn_ctrl = txn_ctrl + self.services = services + + def start_polling(self): + events = self.event_pool.get_events() + if events: + self._process(events) + # repoll later on + + def _process(self, events): + # sort events + # f.e. (AS, events) => poke transaction controller + pass + + +class _TransactionController(object): + + def __init__(self, store, as_api, recoverer_fn): + self.store = store + self.as_api = as_api + self.recoverer_fn = recoverer_fn + + def on_receive_events(self, service, events): + txn = self._store_txn(service, events) + if txn.send(self.as_api): + txn.complete(self.store) + else: + self._start_recoverer(service) + + def _start_recoverer(self, service): + recoverer = self.recoverer_fn(service) + recoverer.recover() + + def _store_txn(self, service, events): + pass # returns AppServiceTransaction + + +class _Recoverer(object): + def __init__(self, store, as_api, service): + self.store = store + self.as_api = as_api + self.service = service + self.backoff_counter = 1 -class EventSorter(object): - pass + def recover(self): + # TODO wait a bit + txn = self._get_oldest_txn() + if txn: + if txn.send(self.as_api): + txn.complete(self.store) + else: + self.backoff_counter += 1 + self.recover(self.service) + return + else: + self._set_service_recovered(self.service) + def _set_service_recovered(self, service): + pass -class TransactionMaker(object): - pass + def _get_oldest_txn(self): + pass # returns AppServiceTransaction -class Recoverer(object): - pass -- cgit 1.5.1 From 0c838f9f5ecec5c0f93d194a00fb82d3877c2c09 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 17:45:52 +0000 Subject: Minor tweaks --- synapse/appservice/scheduler.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3162fbec11..27271e468d 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,6 +49,7 @@ UP & quit +---------- YES SUCCESS | | | NO <--- Have more txns? <------ Mark txn success & nuke -+ from db; incr AS pos. + Reset backoff. This is all tied together by the AppServiceScheduler which DIs the required components. @@ -77,7 +78,7 @@ class AppServiceScheduler(object): def store_event(self, event): # event_pool self.event_pool.append(event) - def get_events(self): # event_pool + def drain_events(self): # event_pool return self.event_pool @@ -90,11 +91,11 @@ class AppServiceTransaction(object): self.events = events def send(self, as_api): - # sends this transaction using this as_api + # TODO sends this transaction using this as_api pass def complete(self, store): - # increment txn id on AS and nuke txn contents from db + # TODO increment txn id on AS and nuke txn contents from db pass @@ -106,14 +107,14 @@ class _EventSorter(object): self.services = services def start_polling(self): - events = self.event_pool.get_events() + events = self.event_pool.drain_events() if events: self._process(events) - # repoll later on + # TODO repoll later on def _process(self, events): - # sort events - # f.e. (AS, events) => poke transaction controller + # TODO sort events + # TODO fe (AS, events) => poke transaction controller on_receive_events pass @@ -153,6 +154,7 @@ class _Recoverer(object): if txn: if txn.send(self.as_api): txn.complete(self.store) + self.backoff_counter = 1 else: self.backoff_counter += 1 self.recover(self.service) -- cgit 1.5.1 From d516d68b293448e686fe30c58d69e030e61ec955 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 10:25:50 +0000 Subject: Rejig structure given the appservice_handler already filters the correct ASes to use. --- synapse/appservice/scheduler.py | 144 ++++++++++++++++++++++++---------------- 1 file changed, 85 insertions(+), 59 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 27271e468d..19fe8e11e8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -16,17 +16,11 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: - ___________ - \O/ --- event -->| | +--------------+ - | - event ---->| event_pool|<-- poll 1/s for events ---| EventSorter | - / \ ---- event ->|___________| +--------------+ - USERS ____________________________| - | | | - V V V - ASa ASb ASc - [e,e] [e] [e,e,e] - | - V + _________ +---ASa[e]-->| Event | +----ASb[e]->| Grouper |<-poll 1/s--+ +--ASa[e]--->|_________| | ASa[e,e] ASb[e] + V -````````- +------------+ |````````|<--StoreTxn-|Transaction | |Database| | Controller |---> SEND TO AS @@ -43,11 +37,11 @@ Recoverer attempts to recover ASes who have died. The flow for this looks like: V | START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE backoff DB and try to send it - ^ |__________ -Mark AS as | V -UP & quit +---------- YES SUCCESS - | | | - NO <--- Have more txns? <------ Mark txn success & nuke -+ + ^ |___________ +Mark AS as | V +UP & quit +---------- YES SUCCESS + | | | + NO <--- Have more txns? <------ Mark txn success & nuke <-+ from db; incr AS pos. Reset backoff. @@ -62,24 +56,28 @@ class AppServiceScheduler(object): case is a simple array. """ - def __init__(self, store, as_api, services): - self.app_services = services - self.event_pool = [] + def __init__(self, clock, store, as_api): + self.clock = clock + self.store = store + self.as_api = as_api + self.event_grouper = _EventGrouper() - def create_recoverer(service): - return _Recoverer(store, as_api, service) - self.txn_ctrl = _TransactionController(store, as_api, create_recoverer) + def create_recoverer(service, callback): + return _Recoverer(clock, store, as_api, service, callback) - self.event_sorter = _EventSorter(self, self.txn_ctrl, services) + self.txn_ctrl = _TransactionController( + clock, store, as_api, self.event_grouper, create_recoverer + ) def start(self): - self.event_sorter.start_polling() - - def store_event(self, event): # event_pool - self.event_pool.append(event) + # check for any DOWN ASes and start recoverers for them. + _Recoverer.start( + self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + ) + self.txn_ctrl.start_polling() - def drain_events(self): # event_pool - return self.event_pool + def submit_event_for_as(self, service, event): + self.event_grouper.on_receive(service, event) class AppServiceTransaction(object): @@ -99,71 +97,99 @@ class AppServiceTransaction(object): pass -class _EventSorter(object): +class _EventGrouper(object): + """Groups events for the same application service together. + """ - def __init__(self, event_pool, txn_ctrl, services): - self.event_pool = event_pool - self.txn_ctrl = txn_ctrl - self.services = services + def __init__(self): + self.groups = {} # dict of {service: [events]} - def start_polling(self): - events = self.event_pool.drain_events() - if events: - self._process(events) - # TODO repoll later on - - def _process(self, events): - # TODO sort events - # TODO fe (AS, events) => poke transaction controller on_receive_events + def on_receive(self, service, event): + # TODO group this pass + def drain_groups(self): + return self.groups + class _TransactionController(object): - def __init__(self, store, as_api, recoverer_fn): + def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): + self.clock = clock self.store = store self.as_api = as_api + self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn - def on_receive_events(self, service, events): - txn = self._store_txn(service, events) - if txn.send(self.as_api): - txn.complete(self.store) - else: - self._start_recoverer(service) + def start_polling(self): + groups = self.event_grouper.drain_groups() + for service in groups: + txn_id = self._get_next_txn_id(service) + txn = AppServiceTransaction(service, txn_id, groups[service]) + self._store_txn(txn) + if self._is_service_up(service): + if txn.send(self.as_api): + txn.complete(self.store) + else: + # TODO mark AS as down + self._start_recoverer(service) + self.clock.call_later(1000, self.start_polling) + + + def on_recovered(self, service): + # TODO mark AS as UP + pass def _start_recoverer(self, service): - recoverer = self.recoverer_fn(service) + recoverer = self.recoverer_fn(service, self.on_recovered) recoverer.recover() - def _store_txn(self, service, events): - pass # returns AppServiceTransaction + def _is_service_up(self, service): + pass + + def _get_next_txn_id(self, service): + pass # TODO work out the next txn_id for this service + + def _store_txn(self, txn): + pass class _Recoverer(object): - def __init__(self, store, as_api, service): + @staticmethod + def start(clock, store, as_api, callback): + # TODO check for DOWN ASes and init recoverers + pass + + def __init__(self, clock, store, as_api, service, callback): + self.clock = clock self.store = store self.as_api = as_api self.service = service + self.callback = callback self.backoff_counter = 1 def recover(self): - # TODO wait a bit + self.clock.call_later(2000 ** self.backoff_counter, self.retry) + + def retry(self): txn = self._get_oldest_txn() if txn: if txn.send(self.as_api): txn.complete(self.store) + # reset the backoff counter and retry immediately self.backoff_counter = 1 + self.retry() + return else: self.backoff_counter += 1 - self.recover(self.service) + self.recover() return else: - self._set_service_recovered(self.service) + self._set_service_recovered() - def _set_service_recovered(self, service): - pass + def _set_service_recovered(self): + self.callback(self.service) def _get_oldest_txn(self): pass # returns AppServiceTransaction -- cgit 1.5.1 From 192e228a98f3700f48d7fd136f4dce2979ec7c90 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 11:50:27 +0000 Subject: Start adding some tests --- synapse/appservice/scheduler.py | 23 ++++---- tests/appservice/test_scheduler.py | 106 +++++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 10 deletions(-) create mode 100644 tests/appservice/test_scheduler.py (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 19fe8e11e8..754f39381f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,6 +49,8 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ +from twisted.internet import defer + class AppServiceScheduler(object): """ Public facing API for this module. Does the required DI to tie the @@ -105,11 +107,14 @@ class _EventGrouper(object): self.groups = {} # dict of {service: [events]} def on_receive(self, service, event): - # TODO group this - pass + if service not in self.groups: + self.groups[service] = [] + self.groups[service].append(event) def drain_groups(self): - return self.groups + groups = self.groups + self.groups = {} + return groups class _TransactionController(object): @@ -135,7 +140,6 @@ class _TransactionController(object): self._start_recoverer(service) self.clock.call_later(1000, self.start_polling) - def on_recovered(self, service): # TODO mark AS as UP pass @@ -172,26 +176,25 @@ class _Recoverer(object): def recover(self): self.clock.call_later(2000 ** self.backoff_counter, self.retry) + @defer.inlineCallbacks def retry(self): - txn = self._get_oldest_txn() + txn = yield self._get_oldest_txn() if txn: if txn.send(self.as_api): txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 self.retry() - return else: self.backoff_counter += 1 self.recover() - return else: self._set_service_recovered() def _set_service_recovered(self): self.callback(self.service) + @defer.inlineCallbacks def _get_oldest_txn(self): - pass # returns AppServiceTransaction - - + txn = yield self.store.get_oldest_txn(self.service) + defer.returnValue(txn) diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py new file mode 100644 index 0000000000..b41d4358cf --- /dev/null +++ b/tests/appservice/test_scheduler.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.appservice.scheduler import ( + AppServiceScheduler, AppServiceTransaction, _EventGrouper, + _TransactionController, _Recoverer +) +from twisted.internet import defer +from ..utils import MockClock +from mock import Mock +from tests import unittest + +class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.as_api = Mock() + self.store = Mock() + self.service = Mock() + self.callback = Mock() + self.recoverer = _Recoverer( + clock=self.clock, + as_api=self.as_api, + store=self.store, + service=self.service, + callback=self.callback, + ) + + def test_recover_service_single_txn(self): + txns = self._mk_txns(1) + self.store.get_oldest_txn = Mock(return_value=defer.succeed(txns[0])) + + self.recoverer.recover() + self.assertEquals(0, self.store.get_oldest_txn.call_count) + self.clock.advance_time(2000) + self.assertEquals(2, self.store.get_oldest_txn.call_count) + + def _mk_txns(self, num_txns): + return [ + Mock() for i in range(num_txns) + ] + + + +class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): + + def setUp(self): + self.grouper = _EventGrouper() + + def test_drain_single_event(self): + service = Mock() + event = Mock() + self.grouper.on_receive(service, event) + groups = self.grouper.drain_groups() + self.assertTrue(service in groups) + self.assertEquals([event], groups[service]) + self.assertEquals(1, len(groups.keys())) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) + + def test_drain_multiple_events(self): + service = Mock() + events = [Mock(), Mock(), Mock()] + for e in events: + self.grouper.on_receive(service, e) + groups = self.grouper.drain_groups() + self.assertTrue(service in groups) + self.assertEquals(events, groups[service]) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) + + def test_drain_multiple_services(self): + services = [Mock(), Mock(), Mock()] + events_a = [Mock(), Mock()] + events_b = [Mock()] + events_c = [Mock(), Mock(), Mock(), Mock()] + mappings = { + services[0]: events_a, + services[1]: events_b, + services[2]: events_c + } + for e in events_b: + self.grouper.on_receive(services[1], e) + for e in events_c: + self.grouper.on_receive(services[2], e) + for e in events_a: + self.grouper.on_receive(services[0], e) + + groups = self.grouper.drain_groups() + for service in services: + self.assertTrue(service in groups) + self.assertEquals(mappings[service], groups[service]) + self.assertEquals(3, len(groups.keys())) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) -- cgit 1.5.1 From 0fbfe1b08a791e95dc9e9d417f131e80b4ce8059 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 14:36:52 +0000 Subject: Add more tests; fix bugs. --- synapse/appservice/scheduler.py | 4 +-- tests/appservice/test_scheduler.py | 54 +++++++++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 754f39381f..f54df9c9a5 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -174,7 +174,7 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later(2000 ** self.backoff_counter, self.retry) + self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry) @defer.inlineCallbacks def retry(self): @@ -184,7 +184,7 @@ class _Recoverer(object): txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 - self.retry() + yield self.retry() else: self.backoff_counter += 1 self.recover() diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index b41d4358cf..1e3eb9e1cc 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -21,6 +21,7 @@ from ..utils import MockClock from mock import Mock from tests import unittest + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -37,21 +38,60 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): callback=self.callback, ) - def test_recover_service_single_txn(self): - txns = self._mk_txns(1) - self.store.get_oldest_txn = Mock(return_value=defer.succeed(txns[0])) + def test_recover_single_txn(self): + txn = Mock() + # return one txn to send, then no more old txns + txns = [txn, None] + + def take_txn(*args, **kwargs): + return defer.succeed(txns.pop(0)) + self.store.get_oldest_txn = Mock(side_effect=take_txn) self.recoverer.recover() + # shouldn't have called anything prior to waiting for exp backoff self.assertEquals(0, self.store.get_oldest_txn.call_count) + txn.send = Mock(return_value=True) + # wait for exp backoff self.clock.advance_time(2000) + self.assertEquals(1, txn.send.call_count) + self.assertEquals(1, txn.complete.call_count) + # 2 because it needs to get None to know there are no more txns self.assertEquals(2, self.store.get_oldest_txn.call_count) + self.assertEquals(1, self.callback.call_count) - def _mk_txns(self, num_txns): - return [ - Mock() for i in range(num_txns) - ] + def test_recover_retry_txn(self): + txn = Mock() + txns = [txn, None] + pop_txn = False + def take_txn(*args, **kwargs): + if pop_txn: + return defer.succeed(txns.pop(0)) + else: + return defer.succeed(txn) + self.store.get_oldest_txn = Mock(side_effect=take_txn) + self.recoverer.recover() + self.assertEquals(0, self.store.get_oldest_txn.call_count) + txn.send = Mock(return_value=False) + self.clock.advance_time(2000) + self.assertEquals(1, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + self.clock.advance_time(4000) + self.assertEquals(2, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + self.clock.advance_time(8000) + self.assertEquals(3, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + txn.send = Mock(return_value=True) # successfully send the txn + pop_txn = True # returns the txn the first time, then no more. + self.clock.advance_time(16000) + self.assertEquals(1, txn.send.call_count) # new mock reset call count + self.assertEquals(1, txn.complete.call_count) + self.assertEquals(1, self.callback.call_count) class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): -- cgit 1.5.1 From 141ec04d194c57f29756d6ccbda3f396cc3aa9e7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 14:53:35 +0000 Subject: Add stub ApplicationServiceTransactionStore. Bootstrap Recoverers. Fill in stub Transaction functions. --- synapse/appservice/scheduler.py | 50 +++++++++++++++++++++++++++++++++++------ synapse/storage/appservice.py | 28 +++++++++++++++++++++++ 2 files changed, 71 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index f54df9c9a5..645d7bf6b2 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -71,11 +71,13 @@ class AppServiceScheduler(object): clock, store, as_api, self.event_grouper, create_recoverer ) + @defer.inlineCallbacks def start(self): # check for any DOWN ASes and start recoverers for them. - _Recoverer.start( + recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ) + self.txn_ctrl.add_recoverers(recoverers) self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): @@ -91,12 +93,34 @@ class AppServiceTransaction(object): self.events = events def send(self, as_api): - # TODO sends this transaction using this as_api - pass + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) def complete(self, store): - # TODO increment txn id on AS and nuke txn contents from db - pass + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) class _EventGrouper(object): @@ -125,6 +149,8 @@ class _TransactionController(object): self.as_api = as_api self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn + # keep track of how many recoverers there are + self.recoverers = [] def start_polling(self): groups = self.event_grouper.drain_groups() @@ -144,6 +170,10 @@ class _TransactionController(object): # TODO mark AS as UP pass + def add_recoverers(self, recoverers): + for r in recoverers: + self.recoverers.append(r) + def _start_recoverer(self, service): recoverer = self.recoverer_fn(service, self.on_recovered) recoverer.recover() @@ -161,9 +191,15 @@ class _TransactionController(object): class _Recoverer(object): @staticmethod + @defer.inlineCallbacks def start(clock, store, as_api, callback): - # TODO check for DOWN ASes and init recoverers - pass + services = yield store.get_failing_appservices() + recoverers = [ + _Recoverer(clock, store, as_api, s, callback) for s in services + ] + for r in recoverers: + r.recover() + defer.returnValue(recoverers) def __init__(self, clock, store, as_api, service, callback): self.clock = clock diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e30265750a..c1762692b9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -336,3 +336,31 @@ class ApplicationServiceStore(SQLBaseStore): hs_token=service["hs_token"], sender=service["sender"] )) + + +class ApplicationServiceTransactionStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceTransactionStore, self).__init__(hs) + + def get_failing_appservices(self): + """Get a list of application services which are down. + + Returns: + A Deferred which resolves to a list of ApplicationServices, which + may be empty. + """ + pass + + def complete_appservice_txn(self, txn_id, service): + """Completes an application service transaction. + + Args: + txn_id(str): The transaction ID being completed. + service(ApplicationService): The application service which was sent + this transaction. + Returns: + A Deferred which resolves to True if this transaction was completed + successfully. + """ + pass -- cgit 1.5.1 From f260cb72cd3435d540411962a92ca2a9fd333eb1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 15:12:24 +0000 Subject: Flesh out more stub functions. --- synapse/appservice/__init__.py | 5 +++++ synapse/appservice/scheduler.py | 37 +++++++++++++++++++++++++++++-------- synapse/storage/appservice.py | 17 +++++++++++++++-- tests/appservice/test_scheduler.py | 5 +++-- 4 files changed, 52 insertions(+), 12 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a268a6bcc4..cc6c381566 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -20,6 +20,11 @@ import re logger = logging.getLogger(__name__) +class ApplicationServiceState(object): + DOWN = "down" + UP = "up" + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 645d7bf6b2..99e83747a8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,7 +49,11 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ +from synapse.appservice import ApplicationServiceState from twisted.internet import defer +import logging + +logger = logging.getLogger(__name__) class AppServiceScheduler(object): @@ -162,21 +166,36 @@ class _TransactionController(object): if txn.send(self.as_api): txn.complete(self.store) else: - # TODO mark AS as down self._start_recoverer(service) self.clock.call_later(1000, self.start_polling) - def on_recovered(self, service): - # TODO mark AS as UP - pass + @defer.inlineCallbacks + def on_recovered(self, recoverer): + applied_state = yield self.store.set_appservice_state( + recoverer.service, + ApplicationServiceState.UP + ) + if not applied_state: + logger.error("Failed to apply appservice state UP to service %s", + recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) + @defer.inlineCallbacks def _start_recoverer(self, service): - recoverer = self.recoverer_fn(service, self.on_recovered) - recoverer.recover() + applied_state = yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + if applied_state: + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + else: + logger.error("Failed to apply appservice state DOWN to service %s", + service) def _is_service_up(self, service): pass @@ -193,7 +212,9 @@ class _Recoverer(object): @staticmethod @defer.inlineCallbacks def start(clock, store, as_api, callback): - services = yield store.get_failing_appservices() + services = yield store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) recoverers = [ _Recoverer(clock, store, as_api, s, callback) for s in services ] @@ -228,7 +249,7 @@ class _Recoverer(object): self._set_service_recovered() def _set_service_recovered(self): - self.callback(self.service) + self.callback(self) @defer.inlineCallbacks def _get_oldest_txn(self): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c1762692b9..214f6d99c5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -343,15 +343,28 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceTransactionStore, self).__init__(hs) - def get_failing_appservices(self): - """Get a list of application services which are down. + def get_appservices_by_state(self, state): + """Get a list of application services based on their state. + Args: + state(ApplicationServiceState): The state to filter on. Returns: A Deferred which resolves to a list of ApplicationServices, which may be empty. """ pass + def set_appservice_state(self, service, state): + """Set the application service state. + + Args: + service(ApplicationService): The service whose state to set. + state(ApplicationServiceState): The connectivity state to apply. + Returns: + A Deferred which resolves to True if the state was set successfully. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 1e3eb9e1cc..ec8f77c54b 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -57,7 +57,8 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns self.assertEquals(2, self.store.get_oldest_txn.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) + self.assertEquals(self.recoverer.service, self.service) def test_recover_retry_txn(self): txn = Mock() @@ -91,7 +92,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.clock.advance_time(16000) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): -- cgit 1.5.1 From 7d3491c74180461dc9d49fc89dad233e240ac475 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 15:17:50 +0000 Subject: Add some loggers --- synapse/appservice/scheduler.py | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 99e83747a8..2b3aa3b0ea 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -171,6 +171,10 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): + self.recoverers.remove(recoverer) + logger.info("Successfully recovered application service: %s", + recoverer.service) + logger.info("Active recoverers: %s", len(self.recoverers)) applied_state = yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP @@ -182,6 +186,8 @@ class _TransactionController(object): def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) + if len(recoverers) > 0: + logger.info("Active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): @@ -190,6 +196,10 @@ class _TransactionController(object): ApplicationServiceState.DOWN ) if applied_state: + logger.info( + "Application service falling behind. Starting recoverer. %s", + service + ) recoverer = self.recoverer_fn(service, self.on_recovered) self.add_recoverers([recoverer]) recoverer.recover() -- cgit 1.5.1 From 0354659f9d8b60b9edc78b0b597bceb52b8c7b2b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:09:05 +0000 Subject: Finish synapse.appservice.scheduler implementation. With tests to assert behaviour. Not hooked up yet. Stub datastore methods not implemented yet. --- synapse/appservice/__init__.py | 39 +++++++++++++ synapse/appservice/scheduler.py | 63 ++++---------------- synapse/storage/appservice.py | 22 +++++++ tests/appservice/test_scheduler.py | 115 ++++++++++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 53 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index cc6c381566..743a8278ad 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -25,6 +25,45 @@ class ApplicationServiceState(object): UP = "up" +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) + + def complete(self, store): + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 2b3aa3b0ea..50ad3b8e83 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -88,45 +88,6 @@ class AppServiceScheduler(object): self.event_grouper.on_receive(service, event) -class AppServiceTransaction(object): - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - def send(self, as_api): - """Sends this transaction using the provided AS API interface. - - Args: - as_api(ApplicationServiceApi): The API to use to send. - Returns: - A Deferred which resolves to True if the transaction was sent. - """ - return as_api.push_bulk( - service=self.service, - events=self.events, - txn_id=self.id - ) - - def complete(self, store): - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - Returns: - A Deferred which resolves to True if the transaction was completed. - """ - return store.complete_appservice_txn( - service=self.service, - txn_id=self.id - ) - - class _EventGrouper(object): """Groups events for the same application service together. """ @@ -156,14 +117,18 @@ class _TransactionController(object): # keep track of how many recoverers there are self.recoverers = [] + @defer.inlineCallbacks def start_polling(self): groups = self.event_grouper.drain_groups() for service in groups: - txn_id = self._get_next_txn_id(service) - txn = AppServiceTransaction(service, txn_id, groups[service]) - self._store_txn(txn) - if self._is_service_up(service): - if txn.send(self.as_api): + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: txn.complete(self.store) else: self._start_recoverer(service) @@ -207,14 +172,10 @@ class _TransactionController(object): logger.error("Failed to apply appservice state DOWN to service %s", service) + @defer.inlineCallbacks def _is_service_up(self, service): - pass - - def _get_next_txn_id(self, service): - pass # TODO work out the next txn_id for this service - - def _store_txn(self, txn): - pass + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP) class _Recoverer(object): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 214f6d99c5..6fde7dcc66 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -354,6 +354,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + pass + def set_appservice_state(self, service, state): """Set the application service state. @@ -365,6 +375,18 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list): A list of events to put in the transaction. + Returns: + ApplicationServiceTransaction: A new transaction. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index ec8f77c54b..a31755da67 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, AppServiceTransaction, _EventGrouper, - _TransactionController, _Recoverer + AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -22,6 +22,116 @@ from mock import Mock from tests import unittest +class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.store = Mock() + self.as_api = Mock() + self.event_grouper = Mock() + self.recoverer = Mock() + self.recoverer_fn = Mock(return_value=self.recoverer) + self.txnctrl = _TransactionController( + clock=self.clock, store=self.store, as_api=self.as_api, + event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + ) + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is successfully sent. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + txn.send = Mock(return_value=defer.succeed(True)) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made + txn.complete.assert_called_once_with(self.store) # txn completed + + def test_poll_single_group_service_down(self): + # Test: The AS is down so it shouldn't push; Recoverers will do it. + # It should still make a transaction though. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + + self.event_grouper.drain_groups = Mock(return_value=groups) + txn = Mock(id="idhere", service=service, events=events) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.DOWN) + ) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, txn.send.call_count) # txn not sent though + self.assertEquals(0, txn.complete.call_count) # or completed + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is not sent. A Recoverer is made and + # started. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + self.store.set_appservice_state = Mock(return_value=defer.succeed(True)) + txn.send = Mock(return_value=defer.succeed(False)) # fails to send + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events + ) + self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made + self.assertEquals(1, self.recoverer.recover.call_count) # and invoked + self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored + self.assertEquals(0, txn.complete.call_count) # txn not completed + self.store.set_appservice_state.assert_called_once_with( + service, ApplicationServiceState.DOWN # service marked as down + ) + + def test_poll_no_groups(self): + self.as_api.push_bulk = Mock() + self.event_grouper.drain_groups = Mock(return_value={}) + self.txnctrl.start_polling() + self.assertEquals(0, self.as_api.push_bulk.call_count) + + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -94,6 +204,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) + class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def setUp(self): -- cgit 1.5.1 From 2602ddc379f9bede21cafc8c8f7f57dec44cf69d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:16:14 +0000 Subject: Apply clarity and docstrings --- synapse/appservice/scheduler.py | 2 +- synapse/storage/appservice.py | 14 +++++++++++++- tests/appservice/test_scheduler.py | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 50ad3b8e83..514148c947 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -224,5 +224,5 @@ class _Recoverer(object): @defer.inlineCallbacks def _get_oldest_txn(self): - txn = yield self.store.get_oldest_txn(self.service) + txn = yield self.store.get_oldest_unsent_txn(self.service) defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 6fde7dcc66..4447c8a2e1 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -383,7 +383,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The service who the transaction is for. events(list): A list of events to put in the transaction. Returns: - ApplicationServiceTransaction: A new transaction. + AppServiceTransaction: A new transaction. """ pass @@ -399,3 +399,15 @@ class ApplicationServiceTransactionStore(SQLBaseStore): successfully. """ pass + + def get_oldest_unsent_txn(self, service): + """Get the oldest transaction which has not been sent for this + service. + + Args: + service(ApplicationService): The app service to get the oldest txn. + Returns: + A Deferred which resolves to an AppServiceTransaction or + None. + """ + pass diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index a31755da67..f75a6f5d95 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer + _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock -- cgit 1.5.1 From 64345b75597cba56e12a172fb227ac2c67993bbd Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:41:19 +0000 Subject: Upper bound the backoff. --- synapse/appservice/scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 514148c947..ee5978da6e 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -214,7 +214,9 @@ class _Recoverer(object): self.backoff_counter = 1 yield self.retry() else: - self.backoff_counter += 1 + # cap the backoff to be around 18h => (2^16) = 65536 secs + if self.backoff_counter < 16: + self.backoff_counter += 1 self.recover() else: self._set_service_recovered() -- cgit 1.5.1 From 01c099d9ef2b3891643845031c917fd0dc41d954 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:16:47 +0000 Subject: Add appservice txns sql schema --- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++++ .../storage/schema/delta/15/appservice_txns.sql | 31 ++++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/15/appservice_txns.sql (limited to 'synapse') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a3ff995695..dfce5224a9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 4447c8a2e1..eec8fbd592 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -385,6 +385,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ + # TODO: work out txn id (highest txn id for this service += 1) + # TODO: Within same db transaction, Insert new txn into txn table pass def complete_appservice_txn(self, txn_id, service): @@ -398,6 +400,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to True if this transaction was completed successfully. """ + # TODO: Set current txn_id for AS to 'txn_id' + # TODO: Delete txn contents pass def get_oldest_unsent_txn(self, service): @@ -410,4 +414,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ + # TODO: Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) pass diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql new file mode 100644 index 0000000000..11f0c799aa --- /dev/null +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -0,0 +1,31 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id INTEGER PRIMARY KEY, + state TEXT NOT NULL, + last_txn TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id INTEGER NOT NULL, + txn_id INTEGER NOT NULL, + content TEXT NOT NULL, + UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK +); + + + -- cgit 1.5.1 From 4a6afa6abf6c90c393bc3fa00e40d3927fc0c6c1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:27:55 +0000 Subject: Assign the AS ID from the database; replace old placeholder txn id. --- synapse/appservice/__init__.py | 4 ++-- synapse/storage/appservice.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 743a8278ad..c60db16b74 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -79,13 +79,13 @@ class ApplicationService(object): NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] def __init__(self, token, url=None, namespaces=None, hs_token=None, - sender=None, txn_id=None): + sender=None, id=None): self.token = token self.url = url self.hs_token = hs_token self.sender = sender self.namespaces = self._check_namespaces(namespaces) - self.txn_id = txn_id + self.id = id def _check_namespaces(self, namespaces): # Sanity check that it is of the form: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index eec8fbd592..582269b8d5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -302,6 +302,7 @@ class ApplicationServiceStore(SQLBaseStore): if as_token not in services: # add the service services[as_token] = { + "id": res["as_id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -326,7 +327,6 @@ class ApplicationServiceStore(SQLBaseStore): except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) - # TODO get last successful txn id f.e. service for service in services.values(): logger.info("Found application service: %s", service) self.services_cache.append(ApplicationService( @@ -334,7 +334,8 @@ class ApplicationServiceStore(SQLBaseStore): url=service["url"], namespaces=service["namespaces"], hs_token=service["hs_token"], - sender=service["sender"] + sender=service["sender"], + id=service["id"] )) -- cgit 1.5.1 From 406d32f8b514a572627eef1326d472e2825b2fe1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:35:14 +0000 Subject: Start implementing ApplicationServiceTransactionStore --- synapse/storage/appservice.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 582269b8d5..0b272e82dd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -374,7 +374,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: A Deferred which resolves to True if the state was set successfully. """ - pass + return self._simple_upsert( + "application_services_state", + dict(as_id=service.id), + dict(state=state) + ) def create_appservice_txn(self, service, events): """Atomically creates a new transaction for this application service -- cgit 1.5.1 From 1c2dcf762a8fe28390e9a98a01577aaadca7f1c0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 13:10:31 +0000 Subject: Partially implement txn store methods with tests. --- synapse/storage/appservice.py | 61 ++++++++++---- tests/storage/test_appservice.py | 171 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 213 insertions(+), 19 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 0b272e82dd..37078f9ef0 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson from simplejson import JSONDecodeError +import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -142,7 +142,7 @@ class ApplicationServiceStore(SQLBaseStore): txn.execute( "INSERT INTO application_services_regex(" "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, simplejson.dumps(regex_obj)) + (as_id, ns_int, json.dumps(regex_obj)) ) return True @@ -277,12 +277,7 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @defer.inlineCallbacks - def _populate_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + def _parse_services_dict(self, results): # SQL results in the form: # [ # { @@ -296,13 +291,12 @@ class ApplicationServiceStore(SQLBaseStore): # } # ] services = {} - results = yield self._execute_and_decode(sql) for res in results: as_token = res["token"] if as_token not in services: # add the service services[as_token] = { - "id": res["as_id"], + "id": res["id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -320,16 +314,16 @@ class ApplicationServiceStore(SQLBaseStore): try: services[as_token]["namespaces"][ ApplicationService.NS_LIST[ns_int]].append( - simplejson.loads(res["regex"]) + json.loads(res["regex"]) ) except IndexError: logger.error("Bad namespace enum '%s'. %s", ns_int, res) except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) + service_list = [] for service in services.values(): - logger.info("Found application service: %s", service) - self.services_cache.append(ApplicationService( + service_list.append(ApplicationService( token=service["token"], url=service["url"], namespaces=service["namespaces"], @@ -337,6 +331,21 @@ class ApplicationServiceStore(SQLBaseStore): sender=service["sender"], id=service["id"] )) + return service_list + + @defer.inlineCallbacks + def _populate_cache(self): + """Populates the ApplicationServiceCache from the database.""" + sql = ("SELECT * FROM application_services LEFT JOIN " + "application_services_regex ON application_services.id = " + "application_services_regex.as_id") + + results = yield self._execute_and_decode(sql) + services = self._parse_services_dict(results) + + for service in services: + logger.info("Found application service: %s", service) + self.services_cache.append(service) class ApplicationServiceTransactionStore(SQLBaseStore): @@ -344,6 +353,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceTransactionStore, self).__init__(hs) + @defer.inlineCallbacks def get_appservices_by_state(self, state): """Get a list of application services based on their state. @@ -353,8 +363,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to a list of ApplicationServices, which may be empty. """ - pass + sql = ( + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " + "application_services AS a ON a.id=s.as_id LEFT JOIN " + "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + ) + results = yield self._execute_and_decode(sql, state) + # NB: This assumes this class is linked with ApplicationServiceStore + defer.returnValue(self._parse_services_dict(results)) + @defer.inlineCallbacks def get_appservice_state(self, service): """Get the application service state. @@ -363,7 +381,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: A Deferred which resolves to ApplicationServiceState. """ - pass + result = yield self._simple_select_one( + "application_services_state", + dict(as_id=service.id), + ["state"], + allow_none=True + ) + if result: + defer.returnValue(result.get("state")) + return + defer.returnValue(None) def set_appservice_state(self, service, state): """Set the application service state. @@ -372,7 +399,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The service whose state to set. state(ApplicationServiceState): The connectivity state to apply. Returns: - A Deferred which resolves to True if the state was set successfully. + A Deferred which resolves when the state was set successfully. """ return self._simple_upsert( "application_services_state", diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index ca5b92ec85..30c0b43d96 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -15,9 +15,11 @@ from tests import unittest from twisted.internet import defer -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.server import HomeServer -from synapse.storage.appservice import ApplicationServiceStore +from synapse.storage.appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -114,3 +116,168 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): def test_retrieval_of_all_services(self): services = yield self.store.get_app_services() self.assertEquals(len(services), 3) + + +class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + self.db_pool = SQLiteMemoryDbPool() + yield self.db_pool.prepare() + hs = HomeServer( + "test", db_pool=self.db_pool, clock=MockClock(), config=Mock() + ) + self.as_list = [ + { + "token": "token1", + "url": "https://matrix-as.org", + "id": 3 + }, + { + "token": "alpha_tok", + "url": "https://alpha.com", + "id": 5 + }, + { + "token": "beta_tok", + "url": "https://beta.com", + "id": 6 + }, + { + "token": "delta_tok", + "url": "https://delta.com", + "id": 7 + }, + ] + for s in self.as_list: + yield self._add_service(s["id"], s["url"], s["token"]) + self.store = TestTransactionStore(hs) + + def _add_service(self, as_id, url, token): + return self.db_pool.runQuery( + "INSERT INTO application_services(id, url, token) VALUES(?,?,?)", + (as_id, url, token) + ) + + def _set_state(self, id, state, txn=None): + return self.db_pool.runQuery( + "INSERT INTO application_services_state(as_id, state, last_txn) " + "VALUES(?,?,?)", + (id, state, txn) + ) + + @defer.inlineCallbacks + def test_get_appservice_state_none(self): + service = Mock(id=999) + state = yield self.store.get_appservice_state(service) + self.assertEquals(None, state) + + @defer.inlineCallbacks + def test_get_appservice_state_up(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.UP + ) + service = Mock(id=self.as_list[0]["id"]) + state = yield self.store.get_appservice_state(service) + self.assertEquals(ApplicationServiceState.UP, state) + + @defer.inlineCallbacks + def test_get_appservice_state_down(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.UP + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[2]["id"], ApplicationServiceState.DOWN + ) + service = Mock(id=self.as_list[1]["id"]) + state = yield self.store.get_appservice_state(service) + self.assertEquals(ApplicationServiceState.DOWN, state) + + @defer.inlineCallbacks + def test_get_appservices_by_state_none(self): + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(0, len(services)) + + @defer.inlineCallbacks + def test_set_appservices_state_down(self): + service = Mock(id=self.as_list[1]["id"]) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + rows = yield self.db_pool.runQuery( + "SELECT as_id FROM application_services_state WHERE state=?", + (ApplicationServiceState.DOWN,) + ) + self.assertEquals(service.id, rows[0][0]) + + @defer.inlineCallbacks + def test_set_appservices_state_multiple_up(self): + service = Mock(id=self.as_list[1]["id"]) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.UP + ) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.UP + ) + rows = yield self.db_pool.runQuery( + "SELECT as_id FROM application_services_state WHERE state=?", + (ApplicationServiceState.UP,) + ) + self.assertEquals(service.id, rows[0][0]) + + @defer.inlineCallbacks + def test_get_appservices_by_state_single(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.UP + ) + + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(1, len(services)) + self.assertEquals(self.as_list[0]["id"], services[0].id) + + @defer.inlineCallbacks + def test_get_appservices_by_state_multiple(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.UP + ) + yield self._set_state( + self.as_list[2]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[3]["id"], ApplicationServiceState.UP + ) + + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(2, len(services)) + self.assertEquals(self.as_list[2]["id"], services[0].id) + self.assertEquals(self.as_list[0]["id"], services[1].id) + + +# required for ApplicationServiceTransactionStoreTestCase tests +class TestTransactionStore(ApplicationServiceTransactionStore, + ApplicationServiceStore): + + def __init__(self, hs): + super(TestTransactionStore, self).__init__(hs) -- cgit 1.5.1 From 1ead1caa18bdbf708446f1faa3d6f3dd13e63c29 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 13:54:20 +0000 Subject: Implement create_appservice_txn with tests. --- synapse/storage/appservice.py | 46 ++++++++++++++++++++++++--- tests/storage/test_appservice.py | 67 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 37078f9ef0..1360a00eae 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,9 +17,10 @@ from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer +from syutil.jsonutil import encode_canonical_json from synapse.api.constants import Membership from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService, ApplicationServiceState +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -417,9 +418,46 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ - # TODO: work out txn id (highest txn id for this service += 1) - # TODO: Within same db transaction, Insert new txn into txn table - pass + return self.runInteraction( + "create_appservice_txn", + self._create_appservice_txn, + service, events + ) + + def _create_appservice_txn(self, txn, service, events): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service.id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None: # no row exists + last_txn_id = 0 + else: + last_txn_id = int(last_txn_id[0]) # select 'last_txn' col + + result = txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = result.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 + + new_txn_id = max(highest_txn_id, last_txn_id) + 1 + + # Insert new txn into txn table + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, content) " + "VALUES(?,?,?)", + (service.id, new_txn_id, encode_canonical_json(events)) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 30c0b43d96..7a8cdb5593 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -21,6 +21,7 @@ from synapse.storage.appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) +import json from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -166,6 +167,20 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): (id, state, txn) ) + def _insert_txn(self, as_id, txn_id, content): + return self.db_pool.runQuery( + "INSERT INTO application_services_txns(as_id, txn_id, content) " + "VALUES(?,?,?)", + (as_id, txn_id, json.dumps(content)) + ) + + def _set_last_txn(self, as_id, txn_id): + return self.db_pool.runQuery( + "INSERT INTO application_services_state(as_id, last_txn, state) " + "VALUES(?,?,?)", + (as_id, txn_id, ApplicationServiceState.UP) + ) + @defer.inlineCallbacks def test_get_appservice_state_none(self): service = Mock(id=999) @@ -237,6 +252,58 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) self.assertEquals(service.id, rows[0][0]) + @defer.inlineCallbacks + def test_create_appservice_txn_first(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 1) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_older_last_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) # AS is falling behind + yield self._insert_txn(service.id, 9644, events) + yield self._insert_txn(service.id, 9645, events) + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9646) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_up_to_date_last_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9644) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_up_fuzzing(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) + + # dump in rows with higher IDs to make sure the queries aren't wrong. + yield self._set_last_txn(self.as_list[1]["id"], 119643) + yield self._set_last_txn(self.as_list[2]["id"], 9) + yield self._set_last_txn(self.as_list[3]["id"], 9643) + yield self._insert_txn(self.as_list[1]["id"], 119644, events) + yield self._insert_txn(self.as_list[1]["id"], 119645, events) + yield self._insert_txn(self.as_list[1]["id"], 119646, events) + yield self._insert_txn(self.as_list[2]["id"], 10, events) + yield self._insert_txn(self.as_list[3]["id"], 9643, events) + + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9644) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + @defer.inlineCallbacks def test_get_appservices_by_state_single(self): yield self._set_state( -- cgit 1.5.1 From 0a60bbf4fac4262da3fee702ca46d2f019597ef1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 15:53:03 +0000 Subject: Finish appservice txn storage impl and tests. --- synapse/storage/appservice.py | 85 ++++++++++++++++++---- .../storage/schema/delta/15/appservice_txns.sql | 2 +- tests/storage/test_appservice.py | 68 +++++++++++++++++ 3 files changed, 139 insertions(+), 16 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 1360a00eae..d89b0cc8c9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -429,15 +429,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # The highest id may be the last one sent (in which case it is last_txn) # or it may be the highest in the txns list (which are waiting to be/are # being sent) - result = txn.execute( - "SELECT last_txn FROM application_services_state WHERE as_id=?", - (service.id,) - ) - last_txn_id = result.fetchone() - if last_txn_id is None: # no row exists - last_txn_id = 0 - else: - last_txn_id = int(last_txn_id[0]) # select 'last_txn' col + last_txn_id = self._get_last_txn(txn, service.id) result = txn.execute( "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", @@ -467,12 +459,43 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The application service which was sent this transaction. Returns: - A Deferred which resolves to True if this transaction was completed + A Deferred which resolves if this transaction was stored successfully. """ - # TODO: Set current txn_id for AS to 'txn_id' - # TODO: Delete txn contents - pass + return self.runInteraction( + "complete_appservice_txn", + self._complete_appservice_txn, + txn_id, service + ) + + def _complete_appservice_txn(self, txn, txn_id, service): + txn_id = int(txn_id) + + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) + ) + + # Delete txn contents + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) def get_oldest_unsent_txn(self, service): """Get the oldest transaction which has not been sent for this @@ -484,6 +507,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ - # TODO: Monotonically increasing txn ids, so just select the smallest + return self.runInteraction( + "get_oldest_unsent_appservice_txn", + self._get_oldest_unsent_txn, + service + ) + + def _get_oldest_unsent_txn(self, txn, service): + # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) - pass + result = txn.execute( + "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + entry = self.cursor_to_dict(result)[0] + + if not entry or entry["txn_id"] is None: + # the min(txn_id) part will force a row, so entry may not be None + return None + + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=json.loads( + entry["content"] + ) + ) + + def _get_last_txn(self, txn, service_id): + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service_id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 11f0c799aa..ff15aa019e 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( as_id INTEGER PRIMARY KEY, - state TEXT NOT NULL, + state TEXT, last_txn TEXT, FOREIGN KEY(as_id) REFERENCES application_services(id) ); diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 7a8cdb5593..d1809c7f3b 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -304,6 +304,74 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) + @defer.inlineCallbacks + def test_complete_appservice_txn_first_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"foo": "bar"}] + txn_id = 1 + + yield self._insert_txn(service.id, txn_id, events) + yield self.store.complete_appservice_txn(txn_id=txn_id, service=service) + + res = yield self.db_pool.runQuery( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service.id,) + ) + self.assertEquals(1, len(res)) + self.assertEquals(str(txn_id), res[0][0]) + + res = yield self.db_pool.runQuery( + "SELECT * FROM application_services_txns WHERE txn_id=?", + (txn_id,) + ) + self.assertEquals(0, len(res)) + + @defer.inlineCallbacks + def test_complete_appservice_txn_existing_in_state_table(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"foo": "bar"}] + txn_id = 5 + yield self._set_last_txn(service.id, 4) + yield self._insert_txn(service.id, txn_id, events) + yield self.store.complete_appservice_txn(txn_id=txn_id, service=service) + + res = yield self.db_pool.runQuery( + "SELECT last_txn, state FROM application_services_state WHERE " + "as_id=?", + (service.id,) + ) + self.assertEquals(1, len(res)) + self.assertEquals(str(txn_id), res[0][0]) + self.assertEquals(ApplicationServiceState.UP, res[0][1]) + + res = yield self.db_pool.runQuery( + "SELECT * FROM application_services_txns WHERE txn_id=?", + (txn_id,) + ) + self.assertEquals(0, len(res)) + + @defer.inlineCallbacks + def test_get_oldest_unsent_txn_none(self): + service = Mock(id=self.as_list[0]["id"]) + + txn = yield self.store.get_oldest_unsent_txn(service) + self.assertEquals(None, txn) + + @defer.inlineCallbacks + def test_get_oldest_unsent_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + + yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"}) + yield self._insert_txn(service.id, 10, events) + yield self._insert_txn(service.id, 11, [{"foo":"bar"}]) + yield self._insert_txn(service.id, 12, [{"argh":"bargh"}]) + + txn = yield self.store.get_oldest_unsent_txn(service) + self.assertEquals(service, txn.service) + self.assertEquals(10, txn.id) + self.assertEquals(events, txn.events) + @defer.inlineCallbacks def test_get_appservices_by_state_single(self): yield self._set_state( -- cgit 1.5.1 From 21fd84dcb8645a555cc35adb8b2a5a68536b8087 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:01:19 +0000 Subject: Use seconds; start gluing in the AS scheduler into the AS handler. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/__init__.py | 8 +++++++- synapse/handlers/appservice.py | 17 ++++++++++++++--- synapse/storage/__init__.py | 7 +++++-- tests/appservice/test_scheduler.py | 10 +++++----- tests/handlers/test_appservice.py | 7 +++++-- 6 files changed, 38 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index ee5978da6e..068d4bd087 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -132,7 +132,7 @@ class _TransactionController(object): txn.complete(self.store) else: self._start_recoverer(service) - self.clock.call_later(1000, self.start_polling) + self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): @@ -202,7 +202,7 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry) + self.clock.call_later((2 ** self.backoff_counter), self.retry) @defer.inlineCallbacks def retry(self): diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8d345bf936..0c51d615ec 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( @@ -54,7 +55,12 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( - hs, ApplicationServiceApi(hs) + hs, asapi, AppServiceScheduler( + clock=hs.get_clock(), + store=hs.get_datastore(), + as_api=asapi + ) ) self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2c488a46f6..f3cd458e6b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -26,15 +26,22 @@ import logging logger = logging.getLogger(__name__) +def log_failure(failure): + logger.error("Application Services Failure: %s", failure.value) + logger.error(failure.getTraceback()) + + # NB: Purposefully not inheriting BaseHandler since that contains way too much # setup code which this handler does not need or use. This makes testing a lot # easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api): + def __init__(self, hs, appservice_api, appservice_scheduler): self.store = hs.get_datastore() self.hs = hs self.appservice_api = appservice_api + self.scheduler = appservice_scheduler + self.started_scheduler = False @defer.inlineCallbacks def register(self, app_service): @@ -90,9 +97,13 @@ class ApplicationServicesHandler(object): if event.type == EventTypes.Member: yield self._check_user_exists(event.state_key) - # Fork off pushes to these services - XXX First cut, best effort + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services for service in services: - self.appservice_api.push(service, event) + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index dfce5224a9..6c159b52a0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.api.constants import EventTypes -from .appservice import ApplicationServiceStore +from .appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -79,7 +81,8 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + ApplicationServiceTransactionStore ): def __init__(self, hs): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 9532bf66b8..e18e879319 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -162,7 +162,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=True) # wait for exp backoff - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns @@ -185,21 +185,21 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.recoverer.recover() self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=False) - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(4000) + self.clock.advance_time(4) self.assertEquals(2, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(8000) + self.clock.advance_time(8) self.assertEquals(3, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) txn.send = Mock(return_value=True) # successfully send the txn pop_txn = True # returns the txn the first time, then no more. - self.clock.advance_time(16000) + self.clock.advance_time(16) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a2c541317c..06cb1dd4cf 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -27,10 +27,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): def setUp(self): self.mock_store = Mock() self.mock_as_api = Mock() + self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) self.handler = ApplicationServicesHandler( - hs, self.mock_as_api + hs, self.mock_as_api, self.mock_scheduler ) @defer.inlineCallbacks @@ -52,7 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() yield self.handler.notify_interested_services(event) - self.mock_as_api.push.assert_called_once_with(interested_service, event) + self.mock_scheduler.submit_event_for_as.assert_called_once_with( + interested_service, event + ) @defer.inlineCallbacks def test_query_room_alias_exists(self): -- cgit 1.5.1 From b98cd03193476dea5f8b47e79d4122bb18449ae2 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:25:20 +0000 Subject: Use event IDs instead of dumping event content in the txns table. --- synapse/storage/appservice.py | 14 +++++----- .../storage/schema/delta/15/appservice_txns.sql | 2 +- tests/storage/test_appservice.py | 30 ++++++++++++---------- 3 files changed, 26 insertions(+), 20 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d89b0cc8c9..c3c0a0bd43 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -442,10 +442,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): new_txn_id = max(highest_txn_id, last_txn_id) + 1 # Insert new txn into txn table + event_ids = [e.event_id for e in events] txn.execute( - "INSERT INTO application_services_txns(as_id, txn_id, content) " + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (service.id, new_txn_id, encode_canonical_json(events)) + (service.id, new_txn_id, json.dumps(event_ids)) ) return AppServiceTransaction( service=service, id=new_txn_id, events=events @@ -491,7 +492,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): dict(last_txn=txn_id) ) - # Delete txn contents + # Delete txn self._simple_delete_txn( txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id) @@ -526,10 +527,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # the min(txn_id) part will force a row, so entry may not be None return None + event_ids = json.loads(entry["event_ids"]) + events = self._get_events_txn(event_ids) + return AppServiceTransaction( - service=service, id=entry["txn_id"], events=json.loads( - entry["content"] - ) + service=service, id=entry["txn_id"], events=events ) def _get_last_txn(self, txn, service_id): diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index ff15aa019e..13bbb2de2e 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( CREATE TABLE IF NOT EXISTS application_services_txns( as_id INTEGER NOT NULL, txn_id INTEGER NOT NULL, - content TEXT NOT NULL, + event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK ); diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index d1809c7f3b..e79599f7fb 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -167,11 +167,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): (id, state, txn) ) - def _insert_txn(self, as_id, txn_id, content): + def _insert_txn(self, as_id, txn_id, events): return self.db_pool.runQuery( - "INSERT INTO application_services_txns(as_id, txn_id, content) " + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (as_id, txn_id, json.dumps(content)) + (as_id, txn_id, json.dumps([e.event_id for e in events])) ) def _set_last_txn(self, as_id, txn_id): @@ -255,7 +255,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_first(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn = yield self.store.create_appservice_txn(service, events) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) @@ -264,7 +264,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_older_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) # AS is falling behind yield self._insert_txn(service.id, 9644, events) yield self._insert_txn(service.id, 9645, events) @@ -276,7 +276,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_up_to_date_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) txn = yield self.store.create_appservice_txn(service, events) self.assertEquals(txn.id, 9644) @@ -286,7 +286,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_up_fuzzing(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) # dump in rows with higher IDs to make sure the queries aren't wrong. @@ -307,7 +307,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_complete_appservice_txn_first_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"foo": "bar"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 1 yield self._insert_txn(service.id, txn_id, events) @@ -329,7 +329,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_complete_appservice_txn_existing_in_state_table(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"foo": "bar"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 5 yield self._set_last_txn(service.id, 4) yield self._insert_txn(service.id, txn_id, events) @@ -360,12 +360,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] + other_events = [Mock(event_id="e5"), Mock(event_id="e6")] - yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"}) + # we aren't testing store._base stuff here, so mock this out + self.store._get_events_txn = Mock(return_value=events) + + yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) - yield self._insert_txn(service.id, 11, [{"foo":"bar"}]) - yield self._insert_txn(service.id, 12, [{"argh":"bargh"}]) + yield self._insert_txn(service.id, 11, other_events) + yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) -- cgit 1.5.1 From 04c9751f24885b974d564b3e5749b7fc9ce01c73 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:45:41 +0000 Subject: Bug fixes whilst putting it all together --- synapse/appservice/api.py | 1 + synapse/appservice/scheduler.py | 4 +++- synapse/storage/appservice.py | 9 ++++----- 3 files changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c17fb219c5..3acb8867a2 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -79,6 +79,7 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("push_bulk: Missing txn ID sending events to %s", service.url) txn_id = str(0) + txn_id = str(txn_id) uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 068d4bd087..3ee2406463 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -175,7 +175,7 @@ class _TransactionController(object): @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) - defer.returnValue(state == ApplicationServiceState.UP) + defer.returnValue(state == ApplicationServiceState.UP or state is None) class _Recoverer(object): @@ -208,6 +208,8 @@ class _Recoverer(object): def retry(self): txn = yield self._get_oldest_txn() if txn: + logger.info("Retrying transaction %s for service %s", + txn.id, txn.service) if txn.send(self.as_api): txn.complete(self.store) # reset the backoff counter and retry immediately diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c3c0a0bd43..ab03106513 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -337,9 +337,8 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def _populate_cache(self): """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " + "application_services_regex AS r ON a.id = r.as_id") results = yield self._execute_and_decode(sql) services = self._parse_services_dict(results) @@ -528,7 +527,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return None event_ids = json.loads(entry["event_ids"]) - events = self._get_events_txn(event_ids) + events = self._get_events_txn(txn, event_ids) return AppServiceTransaction( service=service, id=entry["txn_id"], events=events @@ -540,7 +539,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): (service_id,) ) last_txn_id = result.fetchone() - if last_txn_id is None: # no row exists + if last_txn_id is None or last_txn_id[0] is None: # no row exists return 0 else: return int(last_txn_id[0]) # select 'last_txn' col -- cgit 1.5.1 From 7e0bba555c4abeb55cffc123270ceee858839496 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:48:37 +0000 Subject: Remove unused import --- synapse/storage/appservice.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index ab03106513..fe347dfd3c 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,7 +17,6 @@ from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer -from syutil.jsonutil import encode_canonical_json from synapse.api.constants import Membership from synapse.api.errors import StoreError from synapse.appservice import ApplicationService, AppServiceTransaction -- cgit 1.5.1 From db1fbc6c6fb23ab92dd712aa60f0ff46ea76b42c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 10 Mar 2015 10:04:20 +0000 Subject: Fix remaining scheduler bugs. Add more informative logging. --- synapse/appservice/api.py | 8 +++---- synapse/appservice/scheduler.py | 52 +++++++++++++++++------------------------ synapse/storage/appservice.py | 5 ++-- 3 files changed, 28 insertions(+), 37 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3acb8867a2..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) - response = None try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3ee2406463..add1e3879c 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ class AppServiceScheduler(object): @defer.inlineCallbacks def start(self): + logger.info("Starting appservice scheduler") # check for any DOWN ASes and start recoverers for them. recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered @@ -137,40 +138,33 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service: %s", - recoverer.service) - logger.info("Active recoverers: %s", len(self.recoverers)) - applied_state = yield self.store.set_appservice_state( + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - if not applied_state: - logger.error("Failed to apply appservice state UP to service %s", - recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) if len(recoverers) > 0: - logger.info("Active recoverers: %s", len(self.recoverers)) + logger.info("New active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): - applied_state = yield self.store.set_appservice_state( + yield self.store.set_appservice_state( service, ApplicationServiceState.DOWN ) - if applied_state: - logger.info( - "Application service falling behind. Starting recoverer. %s", - service - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() - else: - logger.error("Failed to apply appservice state DOWN to service %s", - service) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() @defer.inlineCallbacks def _is_service_up(self, service): @@ -190,6 +184,8 @@ class _Recoverer(object): _Recoverer(clock, store, as_api, s, callback) for s in services ] for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) r.recover() defer.returnValue(recoverers) @@ -206,12 +202,13 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): - txn = yield self._get_oldest_txn() + txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for service %s", - txn.id, txn.service) - if txn.send(self.as_api): - txn.complete(self.store) + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 yield self.retry() @@ -225,8 +222,3 @@ class _Recoverer(object): def _set_service_recovered(self): self.callback(self) - - @defer.inlineCallbacks - def _get_oldest_txn(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe347dfd3c..c4b4f56c5d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore): services = {} for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { @@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) result = txn.execute( - "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", (service.id,) ) entry = self.cursor_to_dict(result)[0] - if not entry or entry["txn_id"] is None: # the min(txn_id) part will force a row, so entry may not be None return None -- cgit 1.5.1 From 835e01fc7047e34a813936544027596627a112df Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 10:16:59 +0000 Subject: Minor PR comment tweaks. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/appservice.py | 10 ++++++++-- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++--- tests/appservice/test_scheduler.py | 10 +++++----- 5 files changed, 19 insertions(+), 13 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index add1e3879c..8a3a6a880f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -86,7 +86,7 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.on_receive(service, event) + self.event_grouper.enqueue(service, event) class _EventGrouper(object): @@ -96,7 +96,7 @@ class _EventGrouper(object): def __init__(self): self.groups = {} # dict of {service: [events]} - def on_receive(self, service, event): + def enqueue(self, service, event): if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f3cd458e6b..a24f7f5587 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,8 +27,14 @@ logger = logging.getLogger(__name__) def log_failure(failure): - logger.error("Application Services Failure: %s", failure.value) - logger.error(failure.getTraceback()) + logger.error( + "Application Services Failure", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) # NB: Purposefully not inheriting BaseHandler since that contains way too much diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index efef859214..e752b035e6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -82,7 +82,7 @@ class DataStore(RoomMemberStore, RoomStore, FilteringStore, PusherStore, PushRuleStore, - ApplicationServiceTransactionStore + ApplicationServiceTransactionStore, ): def __init__(self, hs): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 670e1d56af..e928812bc9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -365,9 +365,9 @@ class ApplicationServiceTransactionStore(SQLBaseStore): may be empty. """ sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " - "application_services AS a ON a.id=s.as_id LEFT JOIN " - "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" + " application_services AS a ON a.id=s.as_id LEFT JOIN" + " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" ) results = yield self._execute_and_decode( "get_appservices_by_state", sql, state diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index e18e879319..4534d05b93 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -213,7 +213,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def test_drain_single_event(self): service = Mock() event = Mock() - self.grouper.on_receive(service, event) + self.grouper.enqueue(service, event) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals([event], groups[service]) @@ -225,7 +225,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): service = Mock() events = [Mock(), Mock(), Mock()] for e in events: - self.grouper.on_receive(service, e) + self.grouper.enqueue(service, e) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals(events, groups[service]) @@ -243,11 +243,11 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): services[2]: events_c } for e in events_b: - self.grouper.on_receive(services[1], e) + self.grouper.enqueue(services[1], e) for e in events_c: - self.grouper.on_receive(services[2], e) + self.grouper.enqueue(services[2], e) for e in events_a: - self.grouper.on_receive(services[0], e) + self.grouper.enqueue(services[0], e) groups = self.grouper.drain_groups() for service in services: -- cgit 1.5.1 From c9c444f56260b414d474ea7e9ae28a1a66400357 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 10:38:02 +0000 Subject: Wrap polling/retry blocks in try/excepts to avoid sending to other ASes breaking permanently should an error occur. --- synapse/appservice/scheduler.py | 68 +++++++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 29 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8a3a6a880f..59a870e271 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -120,19 +120,22 @@ class _TransactionController(object): @defer.inlineCallbacks def start_polling(self): - groups = self.event_grouper.drain_groups() - for service in groups: - txn = yield self.store.create_appservice_txn( - service=service, - events=groups[service] - ) - service_is_up = yield self._is_service_up(service) - if service_is_up: - sent = yield txn.send(self.as_api) - if sent: - txn.complete(self.store) - else: - self._start_recoverer(service) + try: + groups = self.event_grouper.drain_groups() + for service in groups: + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) + except Exception as e: + logger.exception(e) self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks @@ -200,25 +203,32 @@ class _Recoverer(object): def recover(self): self.clock.call_later((2 ** self.backoff_counter), self.retry) + def _backoff(self): + # cap the backoff to be around 18h => (2^16) = 65536 secs + if self.backoff_counter < 16: + self.backoff_counter += 1 + self.recover() + @defer.inlineCallbacks def retry(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - if txn: - logger.info("Retrying transaction %s for AS ID %s", - txn.id, txn.service.id) - sent = yield txn.send(self.as_api) - if sent: - yield txn.complete(self.store) - # reset the backoff counter and retry immediately - self.backoff_counter = 1 - yield self.retry() + try: + txn = yield self.store.get_oldest_unsent_txn(self.service) + if txn: + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) + # reset the backoff counter and retry immediately + self.backoff_counter = 1 + yield self.retry() + else: + self._backoff() else: - # cap the backoff to be around 18h => (2^16) = 65536 secs - if self.backoff_counter < 16: - self.backoff_counter += 1 - self.recover() - else: - self._set_service_recovered() + self._set_service_recovered() + except Exception as e: + logger.exception(e) + self._backoff() def _set_service_recovered(self): self.callback(self) -- cgit 1.5.1 From 6279285b2ad59cf003b2e8d73d30dc706e1f3e4a Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 13:15:40 +0000 Subject: Replace EventGrouper for ServiceQueuer to move to push-based txns. Fix tests and add stub tests for ServiceQueuer. --- synapse/appservice/scheduler.py | 61 +++++++++++----------- tests/appservice/test_scheduler.py | 100 ++++++++++--------------------------- 2 files changed, 60 insertions(+), 101 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 59a870e271..54c42d1b94 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -16,11 +16,11 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: - _________ ----ASa[e]-->| Event | -----ASb[e]->| Grouper |<-poll 1/s--+ ---ASa[e]--->|_________| | ASa[e,e] ASb[e] - V + __________ +1---ASa[e]-->| Service |--> Queue ASa[f] +2----ASb[e]->| Queuer | +3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e] + V -````````- +------------+ |````````|<--StoreTxn-|Transaction | |Database| | Controller |---> SEND TO AS @@ -66,14 +66,14 @@ class AppServiceScheduler(object): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = _EventGrouper() def create_recoverer(service, callback): return _Recoverer(clock, store, as_api, service, callback) self.txn_ctrl = _TransactionController( - clock, store, as_api, self.event_grouper, create_recoverer + clock, store, as_api, create_recoverer ) + self.queuer = _ServiceQueuer(self.txn_ctrl) @defer.inlineCallbacks def start(self): @@ -86,17 +86,26 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.enqueue(service, event) + self.queuer.enqueue(service, event) -class _EventGrouper(object): - """Groups events for the same application service together. +class _ServiceQueuer(object): + """Queues events for the same application service together, sending + transactions as soon as possible. Once a transaction is sent successfully, + this schedules any other events in the queue to run. """ - def __init__(self): + def __init__(self, txn_ctrl): self.groups = {} # dict of {service: [events]} + self.txn_ctrl = txn_ctrl def enqueue(self, service, event): + # if nothing in queue for this service, send event immediately and add + # callbacks. + self.txn_ctrl.send(service, [event]) + + # else add to queue for this service + if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) @@ -109,34 +118,30 @@ class _EventGrouper(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): + def __init__(self, clock, store, as_api, recoverer_fn): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn # keep track of how many recoverers there are self.recoverers = [] @defer.inlineCallbacks - def start_polling(self): + def send(self, service, events): try: - groups = self.event_grouper.drain_groups() - for service in groups: - txn = yield self.store.create_appservice_txn( - service=service, - events=groups[service] - ) - service_is_up = yield self._is_service_up(service) - if service_is_up: - sent = yield txn.send(self.as_api) - if sent: - txn.complete(self.store) - else: - self._start_recoverer(service) + txn = yield self.store.create_appservice_txn( + service=service, + events=events + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) except Exception as e: logger.exception(e) - self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 4534d05b93..38d792eb02 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - _EventGrouper, _TransactionController, _Recoverer + _ServiceQueuer, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -28,25 +28,21 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.clock = MockClock() self.store = Mock() self.as_api = Mock() - self.event_grouper = Mock() self.recoverer = Mock() self.recoverer_fn = Mock(return_value=self.recoverer) self.txnctrl = _TransactionController( clock=self.clock, store=self.store, as_api=self.as_api, - event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + recoverer_fn=self.recoverer_fn ) - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_sent(self): # Test: The AS is up and the txn is successfully sent. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -56,7 +52,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -64,15 +60,12 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed - def test_poll_single_group_service_down(self): + def test_single_service_down(self): # Test: The AS is down so it shouldn't push; Recoverers will do it. # It should still make a transaction though. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events - self.event_grouper.drain_groups = Mock(return_value=groups) txn = Mock(id="idhere", service=service, events=events) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.DOWN) @@ -82,7 +75,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -90,18 +83,15 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_not_sent(self): # Test: The AS is up and the txn is not sent. A Recoverer is made and # started. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -112,7 +102,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events @@ -125,12 +115,6 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): service, ApplicationServiceState.DOWN # service marked as down ) - def test_poll_no_groups(self): - self.as_api.push_bulk = Mock() - self.event_grouper.drain_groups = Mock(return_value={}) - self.txnctrl.start_polling() - self.assertEquals(0, self.as_api.push_bulk.call_count) - class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): @@ -205,54 +189,24 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.callback.assert_called_once_with(self.recoverer) -class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): +class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def setUp(self): - self.grouper = _EventGrouper() - - def test_drain_single_event(self): - service = Mock() - event = Mock() - self.grouper.enqueue(service, event) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals([event], groups[service]) - self.assertEquals(1, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_events(self): - service = Mock() - events = [Mock(), Mock(), Mock()] - for e in events: - self.grouper.enqueue(service, e) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals(events, groups[service]) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_services(self): - services = [Mock(), Mock(), Mock()] - events_a = [Mock(), Mock()] - events_b = [Mock()] - events_c = [Mock(), Mock(), Mock(), Mock()] - mappings = { - services[0]: events_a, - services[1]: events_b, - services[2]: events_c - } - for e in events_b: - self.grouper.enqueue(services[1], e) - for e in events_c: - self.grouper.enqueue(services[2], e) - for e in events_a: - self.grouper.enqueue(services[0], e) - - groups = self.grouper.drain_groups() - for service in services: - self.assertTrue(service in groups) - self.assertEquals(mappings[service], groups[service]) - self.assertEquals(3, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) + self.txn_ctrl = Mock() + self.queuer = _ServiceQueuer(self.txn_ctrl) + + def test_send_single_event_no_queue(self): + # Expect the event to be sent immediately. + pass + + def test_send_single_event_with_queue(self): + # - Send an event and don't resolve it just yet. + # - Send another event: expect send() to NOT be called. + # - Resolve the send event + # - Expect queued event to be sent + pass + + def test_multiple_service_queues(self): + # Tests that each service has its own queue, and that they don't block + # on each other. + pass -- cgit 1.5.1 From d04fa1f7121d996e05bd4def14951d89eb47d1ab Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 14:03:16 +0000 Subject: Implement ServiceQueuer with tests. --- synapse/appservice/scheduler.py | 46 +++++++++++++++++++++----------- tests/appservice/test_scheduler.py | 54 +++++++++++++++++++++++++++++++++----- 2 files changed, 77 insertions(+), 23 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 54c42d1b94..3cedd479a2 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -83,7 +83,6 @@ class AppServiceScheduler(object): self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ) self.txn_ctrl.add_recoverers(recoverers) - self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) @@ -96,24 +95,37 @@ class _ServiceQueuer(object): """ def __init__(self, txn_ctrl): - self.groups = {} # dict of {service: [events]} + self.queued_events = {} # dict of {service_id: [events]} + self.pending_requests = {} # dict of {service_id: Deferred} self.txn_ctrl = txn_ctrl def enqueue(self, service, event): - # if nothing in queue for this service, send event immediately and add - # callbacks. - self.txn_ctrl.send(service, [event]) - - # else add to queue for this service - - if service not in self.groups: - self.groups[service] = [] - self.groups[service].append(event) - - def drain_groups(self): - groups = self.groups - self.groups = {} - return groups + # if this service isn't being sent something + if not self.pending_requests.get(service.id): + self._send_request(service, [event]) + else: + # add to queue for this service + if service.id not in self.queued_events: + self.queued_events[service.id] = [] + self.queued_events[service.id].append(event) + + def _send_request(self, service, events): + # send request and add callbacks + d = self.txn_ctrl.send(service, events) + d.addCallback(self._on_request_finish) + d.addErrback(self._on_request_fail) + self.pending_requests[service.id] = d + + def _on_request_finish(self, service): + self.pending_requests[service.id] = None + # if there are queued events, then send them. + if (service.id in self.queued_events + and len(self.queued_events[service.id]) > 0): + self._send_request(service, self.queued_events[service.id]) + self.queued_events[service.id] = [] + + def _on_request_fail(self, err): + logger.error("AS request failed: %s", err) class _TransactionController(object): @@ -142,6 +154,8 @@ class _TransactionController(object): self._start_recoverer(service) except Exception as e: logger.exception(e) + # request has finished + defer.returnValue(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 38d792eb02..82a5965097 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -197,16 +197,56 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def test_send_single_event_no_queue(self): # Expect the event to be sent immediately. - pass + service = Mock(id=4) + event = Mock() + self.queuer.enqueue(service, event) + self.txn_ctrl.send.assert_called_once_with(service, [event]) def test_send_single_event_with_queue(self): - # - Send an event and don't resolve it just yet. - # - Send another event: expect send() to NOT be called. - # - Resolve the send event - # - Expect queued event to be sent - pass + d = defer.Deferred() + self.txn_ctrl.send = Mock(return_value=d) + service = Mock(id=4) + event = Mock(event_id="first") + event2 = Mock(event_id="second") + event3 = Mock(event_id="third") + # Send an event and don't resolve it just yet. + self.queuer.enqueue(service, event) + # Send more events: expect send() to NOT be called multiple times. + self.queuer.enqueue(service, event2) + self.queuer.enqueue(service, event3) + self.txn_ctrl.send.assert_called_with(service, [event]) + self.assertEquals(1, self.txn_ctrl.send.call_count) + # Resolve the send event: expect the queued events to be sent + d.callback(service) + self.txn_ctrl.send.assert_called_with(service, [event2, event3]) + self.assertEquals(2, self.txn_ctrl.send.call_count) def test_multiple_service_queues(self): # Tests that each service has its own queue, and that they don't block # on each other. - pass + srv1 = Mock(id=4) + srv_1_defer = defer.Deferred() + srv_1_event = Mock(event_id="srv1a") + srv_1_event2 = Mock(event_id="srv1b") + + srv2 = Mock(id=6) + srv_2_defer = defer.Deferred() + srv_2_event = Mock(event_id="srv2a") + srv_2_event2 = Mock(event_id="srv2b") + + send_return_list = [srv_1_defer, srv_2_defer] + self.txn_ctrl.send = Mock(side_effect=lambda x,y: send_return_list.pop(0)) + + # send events for different ASes and make sure they are sent + self.queuer.enqueue(srv1, srv_1_event) + self.queuer.enqueue(srv1, srv_1_event2) + self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event]) + self.queuer.enqueue(srv2, srv_2_event) + self.queuer.enqueue(srv2, srv_2_event2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event]) + + # make sure callbacks for a service only send queued events for THAT + # service + srv_2_defer.callback(srv2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2]) + self.assertEquals(3, self.txn_ctrl.send.call_count) -- cgit 1.5.1 From f0d6f724a241a50d4a12b1c00af2a4cc6f9a43f1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 15:24:32 +0000 Subject: Set the service ID as soon as it is known. --- synapse/handlers/appservice.py | 2 +- synapse/storage/appservice.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index a24f7f5587..58b5b60bb7 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -59,13 +59,13 @@ class ApplicationServicesHandler(object): ) if not stored_service: raise StoreError(404, "Application service not found") + app_service.id = stored_service.id except StoreError: raise SynapseError( 403, "Unrecognised application services token. " "Consult the home server admin.", errcode=Codes.FORBIDDEN ) - app_service.hs_token = self._generate_hs_token() # create a sender for this application service which is used when diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e928812bc9..06b3a04afc 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -101,11 +101,12 @@ class ApplicationServiceStore(SQLBaseStore): if not service.hs_token: raise StoreError(500, "No HS token") - yield self.runInteraction( + as_id = yield self.runInteraction( "update_app_service", self._update_app_service_txn, service ) + service.id = as_id # update cache TODO: Should this be in the txn? for (index, cache_service) in enumerate(self.services_cache): @@ -124,7 +125,7 @@ class ApplicationServiceStore(SQLBaseStore): "update_app_service_txn: Failed to find as_id for token=", service.token ) - return False + return txn.execute( "UPDATE application_services SET url=?, hs_token=?, sender=? " @@ -144,7 +145,7 @@ class ApplicationServiceStore(SQLBaseStore): "as_id, namespace, regex) values(?,?,?)", (as_id, ns_int, json.dumps(regex_obj)) ) - return True + return as_id def _get_as_id_txn(self, txn, token): cursor = txn.execute( -- cgit 1.5.1 From b1022ed8b5df2d9827cf0437574fce4154eb606e Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 17:28:33 +0000 Subject: func(*EXPR) is valid Python syntax, really... --- synapse/storage/_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e53630a689..2552a74f85 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -100,8 +100,7 @@ def cached(max_entries=1000, num_args=1): cache_counter.inc_misses(name) ret = yield orig(self, *keyargs) - prefill_args = keyargs + (ret,) - prefill(*prefill_args) + prefill(*keyargs + (ret,)) defer.returnValue(ret) -- cgit 1.5.1 From 0f86312c4cb262ad1b69207dd46712707dee75bb Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 18:13:49 +0000 Subject: Pull out the cache logic from the @cached wrapper into its own class we can reuse --- synapse/storage/_base.py | 89 +++++++++++++++++++++++++++------------------ tests/storage/test__base.py | 34 ++++++++++++++++- 2 files changed, 87 insertions(+), 36 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2552a74f85..27ea65a0f6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -53,6 +53,47 @@ cache_counter = metrics.register_cache( ) +class Cache(object): + + def __init__(self, name, max_entries=1000, keylen=1): + self.cache = OrderedDict() + + self.max_entries = max_entries + self.name = name + self.keylen = keylen + + caches_by_name[name] = self.cache + + def get(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if keyargs in self.cache: + cache_counter.inc_hits(self.name) + return self.cache[keyargs] + + cache_counter.inc_misses(self.name) + raise KeyError() + + def prefill(self, *args): # because I can't *keyargs, value + keyargs = args[:-1] + value = args[-1] + + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + while len(self.cache) > self.max_entries: + self.cache.popitem(last=False) + + self.cache[keyargs] = value + + def invalidate(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + self.cache.pop(keyargs, None) + + # TODO(paul): # * consider other eviction strategies - LRU? def cached(max_entries=1000, num_args=1): @@ -70,48 +111,26 @@ def cached(max_entries=1000, num_args=1): calling the calculation function. """ def wrap(orig): - cache = OrderedDict() - name = orig.__name__ - - caches_by_name[name] = cache - - def prefill(*args): # because I can't *keyargs, value - keyargs = args[:-1] - value = args[-1] - - if len(keyargs) != num_args: - raise ValueError("Expected a call to have %d arguments", num_args) - - while len(cache) > max_entries: - cache.popitem(last=False) - - cache[keyargs] = value + cache = Cache( + name=orig.__name__, + max_entries=max_entries, + keylen=num_args, + ) @functools.wraps(orig) @defer.inlineCallbacks def wrapped(self, *keyargs): - if len(keyargs) != num_args: - raise ValueError("Expected a call to have %d arguments", num_args) - - if keyargs in cache: - cache_counter.inc_hits(name) - defer.returnValue(cache[keyargs]) - - cache_counter.inc_misses(name) - ret = yield orig(self, *keyargs) - - prefill(*keyargs + (ret,)) - - defer.returnValue(ret) + try: + defer.returnValue(cache.get(*keyargs)) + except KeyError: + ret = yield orig(self, *keyargs) - def invalidate(*keyargs): - if len(keyargs) != num_args: - raise ValueError("Expected a call to have %d arguments", num_args) + cache.prefill(*keyargs + (ret,)) - cache.pop(keyargs, None) + defer.returnValue(ret) - wrapped.invalidate = invalidate - wrapped.prefill = prefill + wrapped.invalidate = cache.invalidate + wrapped.prefill = cache.prefill return wrapped return wrap diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 55d22f665a..783abc2b00 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -17,7 +17,39 @@ from tests import unittest from twisted.internet import defer -from synapse.storage._base import cached +from synapse.storage._base import Cache, cached + + +class CacheTestCase(unittest.TestCase): + + def setUp(self): + self.cache = Cache("test") + + def test_empty(self): + failed = False + try: + self.cache.get("foo") + except KeyError: + failed = True + + self.assertTrue(failed) + + def test_hit(self): + self.cache.prefill("foo", 123) + + self.assertEquals(self.cache.get("foo"), 123) + + def test_invalidate(self): + self.cache.prefill("foo", 123) + self.cache.invalidate("foo") + + failed = False + try: + self.cache.get("foo") + except KeyError: + failed = True + + self.assertTrue(failed) class CacheDecoratorTestCase(unittest.TestCase): -- cgit 1.5.1 From a63b4f71013f6a4e96b2b703c3a469fc8a9a5d57 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 17:08:15 +0000 Subject: Remember the 'last seen' time for a given user/IP/device combination and only bother INSERTing another if it's stale --- synapse/storage/__init__.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 76e7bdfaed..c69d11261c 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + +from ._base import Cache from .appservice import ApplicationServiceStore from .directory import DirectoryStore from .events import EventsStore @@ -51,6 +54,11 @@ SCHEMA_VERSION = 14 dir_path = os.path.abspath(os.path.dirname(__file__)) +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120*1000 + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, @@ -73,8 +81,28 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, + ) + + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, device_id, ip, user_agent): - return self._simple_insert( + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, device_id, ip) + + try: + last_seen = self.client_ip_last_seen.get(*key) + except KeyError: + last_seen = None + + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) + + self.client_ip_last_seen.prefill(*key + (now,)) + + yield self._simple_insert( "user_ips", { "user": user.to_string(), @@ -82,7 +110,7 @@ class DataStore(RoomMemberStore, RoomStore, "device_id": device_id, "ip": ip, "user_agent": user_agent, - "last_seen": int(self._clock.time_msec()), + "last_seen": now, }, desc="insert_client_ip", ) -- cgit 1.5.1 From 72d84064094be60a907ca515739e2a4ea1af0bd5 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 19:21:13 +0000 Subject: Put a cache on get_aliases_for_room --- synapse/storage/directory.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 6672752fe0..0199539fea 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.errors import SynapseError @@ -106,14 +106,19 @@ class DirectoryStore(SQLBaseStore): }, desc="create_room_alias_association", ) + self.get_aliases_for_room.invalidate(room_id) + @defer.inlineCallbacks def delete_room_alias(self, room_alias): - return self.runInteraction( + room_id = yield self.runInteraction( "delete_room_alias", self._delete_room_alias_txn, room_alias, ) + self.get_aliases_for_room.invalidate(room_id) + defer.returnValue(room_id) + def _delete_room_alias_txn(self, txn, room_alias): cursor = txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", @@ -138,6 +143,7 @@ class DirectoryStore(SQLBaseStore): return room_id + @cached() def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", -- cgit 1.5.1 From ed008e85a8a2d9254d4d6f23cc7eb47ee52d0989 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Mar 2015 17:25:44 +0000 Subject: Reduce activity timer granularity to avoid too many quick updates (SYN-247) --- synapse/handlers/presence.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 731df00648..bbc7a0f200 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -33,6 +33,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) +# Don't bother bumping "last active" time if it differs by less than 60 seconds +LAST_ACTIVE_GRANULARITY = 60*1000 + + # TODO(paul): Maybe there's one of these I can steal from somewhere def partition(l, func): """Partition the list by the result of func applied to each element.""" @@ -282,6 +286,10 @@ class PresenceHandler(BaseHandler): if now is None: now = self.clock.time_msec() + prev_state = self._get_or_make_usercache(user) + if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: + return + self.changed_presencelike_data(user, {"last_active": now}) def changed_presencelike_data(self, user, state): -- cgit 1.5.1 From d6b3ea75d4eba6961242ce68d5df90557b00609b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:04:59 +0000 Subject: Implement the 'key in dict' test for LruCache() --- synapse/util/lrucache.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse') diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py index 65d5792907..2f7b615f78 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/lrucache.py @@ -90,12 +90,16 @@ class LruCache(object): def cache_len(): return len(cache) + def cache_contains(key): + return key in cache + self.sentinel = object() self.get = cache_get self.set = cache_set self.setdefault = cache_set_default self.pop = cache_pop self.len = cache_len + self.contains = cache_contains def __getitem__(self, key): result = self.get(key, self.sentinel) @@ -114,3 +118,6 @@ class LruCache(object): def __len__(self): return self.len() + + def __contains__(self, key): + return self.contains(key) -- cgit 1.5.1 From 9ba6487b3fe985c4ec84b02d9804aea7e2df6c40 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:05:34 +0000 Subject: Allow a choice of LRU behaviour for Cache() by using LruCache() or OrderedDict() --- synapse/storage/_base.py | 20 ++++++++++++-------- tests/storage/test__base.py | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 27ea65a0f6..6fa63f052e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -55,10 +55,14 @@ cache_counter = metrics.register_cache( class Cache(object): - def __init__(self, name, max_entries=1000, keylen=1): - self.cache = OrderedDict() + def __init__(self, name, max_entries=1000, keylen=1, lru=False): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries - self.max_entries = max_entries self.name = name self.keylen = keylen @@ -82,8 +86,9 @@ class Cache(object): if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) - while len(self.cache) > self.max_entries: - self.cache.popitem(last=False) + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) self.cache[keyargs] = value @@ -94,9 +99,7 @@ class Cache(object): self.cache.pop(keyargs, None) -# TODO(paul): -# * consider other eviction strategies - LRU? -def cached(max_entries=1000, num_args=1): +def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. The function is presumed to take zero or more arguments, which are used in @@ -115,6 +118,7 @@ def cached(max_entries=1000, num_args=1): name=orig.__name__, max_entries=max_entries, keylen=num_args, + lru=lru, ) @functools.wraps(orig) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index b6853ba2d4..96caf8c4c1 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -69,6 +69,28 @@ class CacheTestCase(unittest.TestCase): cache.get(2) cache.get(3) + def test_eviction_lru(self): + cache = Cache("test", max_entries=2, lru=True) + + cache.prefill(1, "one") + cache.prefill(2, "two") + + # Now access 1 again, thus causing 2 to be least-recently used + cache.get(1) + + cache.prefill(3, "three") + + failed = False + try: + cache.get(2) + except KeyError: + failed = True + + self.assertTrue(failed) + + cache.get(1) + cache.get(3) + class CacheDecoratorTestCase(unittest.TestCase): -- cgit 1.5.1 From 32206dde3f8dd59412490cd6f590304438c900f4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Mar 2015 10:11:52 +0000 Subject: Fixes from PR comments --- synapse/appservice/scheduler.py | 3 ++- synapse/storage/appservice.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3cedd479a2..59b0b1f4ac 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -112,7 +112,7 @@ class _ServiceQueuer(object): def _send_request(self, service, events): # send request and add callbacks d = self.txn_ctrl.send(service, events) - d.addCallback(self._on_request_finish) + d.addBoth(self._on_request_finish) d.addErrback(self._on_request_fail) self.pending_requests[service.id] = d @@ -154,6 +154,7 @@ class _TransactionController(object): self._start_recoverer(service) except Exception as e: logger.exception(e) + self._start_recoverer(service) # request has finished defer.returnValue(service) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 06b3a04afc..93304a745f 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.services_cache = [] - self.cache_defer = self._populate_cache() + self.cache_defer = self._populate_appservice_cache() self.cache_defer.addErrback(log_failure) @defer.inlineCallbacks @@ -337,7 +337,7 @@ class ApplicationServiceStore(SQLBaseStore): return service_list @defer.inlineCallbacks - def _populate_cache(self): + def _populate_appservice_cache(self): """Populates the ApplicationServiceCache from the database.""" sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " "application_services_regex AS r ON a.id = r.as_id") -- cgit 1.5.1 From ff1fa0fbf80cbb636e4cce59846bb5dcc91ccd03 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 27 Mar 2015 15:57:16 +0000 Subject: Add another @cached wrapper, this time on get_presence_state() --- synapse/storage/presence.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 87fba55439..e6fc19ccec 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from twisted.internet import defer + +from ._base import SQLBaseStore, cached class PresenceStore(SQLBaseStore): @@ -33,6 +35,7 @@ class PresenceStore(SQLBaseStore): desc="has_presence_state", ) + @cached() def get_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -41,8 +44,9 @@ class PresenceStore(SQLBaseStore): desc="get_presence_state", ) + @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - return self._simple_update_one( + ret = yield self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -50,6 +54,8 @@ class PresenceStore(SQLBaseStore): "mtime": self._clock.time_msec()}, desc="set_presence_state", ) + self.get_presence_state.invalidate(user_localpart) + defer.returnValue(ret) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( -- cgit 1.5.1 From 3e420aebd86dbd641ddc07039b220420a43fc39c Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 27 Mar 2015 16:16:58 +0000 Subject: Revert "Add another @cached wrapper, this time on get_presence_state()" This reverts commit ff1fa0fbf80cbb636e4cce59846bb5dcc91ccd03. --- synapse/storage/presence.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index e6fc19ccec..87fba55439 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore class PresenceStore(SQLBaseStore): @@ -35,7 +33,6 @@ class PresenceStore(SQLBaseStore): desc="has_presence_state", ) - @cached() def get_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -44,9 +41,8 @@ class PresenceStore(SQLBaseStore): desc="get_presence_state", ) - @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - ret = yield self._simple_update_one( + return self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -54,8 +50,6 @@ class PresenceStore(SQLBaseStore): "mtime": self._clock.time_msec()}, desc="set_presence_state", ) - self.get_presence_state.invalidate(user_localpart) - defer.returnValue(ret) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( -- cgit 1.5.1 From 8366fde82f609448bd96a882c72cea7d2baa52f0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 30 Mar 2015 12:01:09 -0400 Subject: turn --disable-registration into --enable-registration, given the default is for registration to be disabled by default now. this is backwards incompatible by removing the old --disable-registration arg, but makes for a much more intuitive arg --- synapse/config/registration.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 4401e774d1..a6a2d2c5e1 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -25,11 +25,11 @@ class RegistrationConfig(Config): def __init__(self, args): super(RegistrationConfig, self).__init__(args) - # `args.disable_registration` may either be a bool or a string depending - # on if the option was given a value (e.g. --disable-registration=false - # would set `args.disable_registration` to "false" not False.) - self.disable_registration = bool( - distutils.util.strtobool(str(args.disable_registration)) + # `args.enable_registration` may either be a bool or a string depending + # on if the option was given a value (e.g. --enable-registration=true + # would set `args.enable_registration` to "true" not True.) + self.disable_registration = not bool( + distutils.util.strtobool(str(args.enable_registration)) ) self.registration_shared_secret = args.registration_shared_secret @@ -39,11 +39,11 @@ class RegistrationConfig(Config): reg_group = parser.add_argument_group("registration") reg_group.add_argument( - "--disable-registration", - const=True, - default=True, + "--enable-registration", + const=False, + default=False, nargs='?', - help="Disable registration of new users.", + help="Enable registration for new users.", ) reg_group.add_argument( "--registration-shared-secret", type=str, @@ -53,8 +53,8 @@ class RegistrationConfig(Config): @classmethod def generate_config(cls, args, config_dir_path): - if args.disable_registration is None: - args.disable_registration = True + if args.enable_registration is None: + args.enable_registration = False if args.registration_shared_secret is None: args.registration_shared_secret = random_string_with_symbols(50) -- cgit 1.5.1 From af853a4cdb4d6a15a9a249da8bf1aa5be1998aae Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 09:22:31 +0100 Subject: Add AppServiceConfig --- synapse/config/appservice.py | 31 +++++++++++++++++++++++++++++++ synapse/config/homeserver.py | 3 ++- 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 synapse/config/appservice.py (limited to 'synapse') diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py new file mode 100644 index 0000000000..399a716d80 --- /dev/null +++ b/synapse/config/appservice.py @@ -0,0 +1,31 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + + +class AppServiceConfig(Config): + + def __init__(self, args): + super(AppServiceConfig, self).__init__(args) + self.app_service_config_files = args.app_service_config_files + + @classmethod + def add_arguments(cls, parser): + super(AppServiceConfig, cls).add_arguments(parser) + group = parser.add_argument_group("appservice") + group.add_argument( + "--app-service-config-files", type=str, nargs='+', + help="A list of application service config files to use." + ) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 241afdf872..3edfadb98b 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -24,12 +24,13 @@ from .email import EmailConfig from .voip import VoipConfig from .registration import RegistrationConfig from .metrics import MetricsConfig +from .appservice import AppServiceConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, EmailConfig, VoipConfig, RegistrationConfig, - MetricsConfig,): + MetricsConfig, AppServiceConfig,): pass -- cgit 1.5.1 From e7887e37a86adbdc2dcb5bd3fbaabf836b168bd8 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 09:32:40 +0100 Subject: Remove appservice REST servlets --- synapse/app/homeserver.py | 5 -- synapse/rest/appservice/__init__.py | 14 ----- synapse/rest/appservice/v1/__init__.py | 29 ---------- synapse/rest/appservice/v1/base.py | 48 ----------------- synapse/rest/appservice/v1/register.py | 99 ---------------------------------- synapse/server.py | 1 - 6 files changed, 196 deletions(-) delete mode 100644 synapse/rest/appservice/__init__.py delete mode 100644 synapse/rest/appservice/v1/__init__.py delete mode 100644 synapse/rest/appservice/v1/base.py delete mode 100644 synapse/rest/appservice/v1/register.py (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 500cae05fb..29ca720d5e 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,7 +32,6 @@ from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site from synapse.http.server import JsonResource, RootRedirect -from synapse.rest.appservice.v1 import AppServiceRestResource from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.http.server_key_resource import LocalKey @@ -78,9 +77,6 @@ class SynapseHomeServer(HomeServer): def build_resource_for_federation(self): return JsonResource(self) - def build_resource_for_app_services(self): - return AppServiceRestResource(self) - def build_resource_for_web_client(self): import syweb syweb_path = os.path.dirname(syweb.__file__) @@ -141,7 +137,6 @@ class SynapseHomeServer(HomeServer): (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()), (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), (MEDIA_PREFIX, self.get_resource_for_media_repository()), - (APP_SERVICE_PREFIX, self.get_resource_for_app_services()), (STATIC_PREFIX, self.get_resource_for_static_content()), ] diff --git a/synapse/rest/appservice/__init__.py b/synapse/rest/appservice/__init__.py deleted file mode 100644 index 1a84d94cd9..0000000000 --- a/synapse/rest/appservice/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/synapse/rest/appservice/v1/__init__.py b/synapse/rest/appservice/v1/__init__.py deleted file mode 100644 index a7877609ad..0000000000 --- a/synapse/rest/appservice/v1/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from . import register - -from synapse.http.server import JsonResource - - -class AppServiceRestResource(JsonResource): - """A resource for version 1 of the matrix application service API.""" - - def __init__(self, hs): - JsonResource.__init__(self, hs) - self.register_servlets(self, hs) - - @staticmethod - def register_servlets(appservice_resource, hs): - register.register_servlets(hs, appservice_resource) diff --git a/synapse/rest/appservice/v1/base.py b/synapse/rest/appservice/v1/base.py deleted file mode 100644 index 65d5bcf9be..0000000000 --- a/synapse/rest/appservice/v1/base.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains base REST classes for constructing client v1 servlets. -""" - -from synapse.http.servlet import RestServlet -from synapse.api.urls import APP_SERVICE_PREFIX -import re - -import logging - - -logger = logging.getLogger(__name__) - - -def as_path_pattern(path_regex): - """Creates a regex compiled appservice path with the correct path - prefix. - - Args: - path_regex (str): The regex string to match. This should NOT have a ^ - as this will be prefixed. - Returns: - SRE_Pattern - """ - return re.compile("^" + APP_SERVICE_PREFIX + path_regex) - - -class AppServiceRestServlet(RestServlet): - """A base Synapse REST Servlet for the application services version 1 API. - """ - - def __init__(self, hs): - self.hs = hs - self.handler = hs.get_handlers().appservice_handler diff --git a/synapse/rest/appservice/v1/register.py b/synapse/rest/appservice/v1/register.py deleted file mode 100644 index ea24d88f79..0000000000 --- a/synapse/rest/appservice/v1/register.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains REST servlets to do with registration: /register""" -from twisted.internet import defer - -from base import AppServiceRestServlet, as_path_pattern -from synapse.api.errors import CodeMessageException, SynapseError -from synapse.storage.appservice import ApplicationService - -import json -import logging - -logger = logging.getLogger(__name__) - - -class RegisterRestServlet(AppServiceRestServlet): - """Handles AS registration with the home server. - """ - - PATTERN = as_path_pattern("/register$") - - @defer.inlineCallbacks - def on_POST(self, request): - params = _parse_json(request) - - # sanity check required params - try: - as_token = params["as_token"] - as_url = params["url"] - if (not isinstance(as_token, basestring) or - not isinstance(as_url, basestring)): - raise ValueError - except (KeyError, ValueError): - raise SynapseError( - 400, "Missed required keys: as_token(str) / url(str)." - ) - - try: - app_service = ApplicationService( - as_token, as_url, params["namespaces"] - ) - except ValueError as e: - raise SynapseError(400, e.message) - - app_service = yield self.handler.register(app_service) - hs_token = app_service.hs_token - - defer.returnValue((200, { - "hs_token": hs_token - })) - - -class UnregisterRestServlet(AppServiceRestServlet): - """Handles AS registration with the home server. - """ - - PATTERN = as_path_pattern("/unregister$") - - def on_POST(self, request): - params = _parse_json(request) - try: - as_token = params["as_token"] - if not isinstance(as_token, basestring): - raise ValueError - except (KeyError, ValueError): - raise SynapseError(400, "Missing required key: as_token(str)") - - yield self.handler.unregister(as_token) - - raise CodeMessageException(500, "Not implemented") - - -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.") - return content - except ValueError as e: - logger.warn(e) - raise SynapseError(400, "Content not JSON.") - - -def register_servlets(hs, http_server): - RegisterRestServlet(hs).register(http_server) - UnregisterRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index c7772244ba..0bd87bdd77 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -79,7 +79,6 @@ class BaseHomeServer(object): 'resource_for_content_repo', 'resource_for_server_key', 'resource_for_media_repository', - 'resource_for_app_services', 'resource_for_metrics', 'event_sources', 'ratelimiter', -- cgit 1.5.1 From d33ae65efc14a18a8a690d39d6e9c81aaafa1062 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 11:00:00 +0100 Subject: Remove more reg/unreg methods. Read config not database for cache. --- synapse/handlers/appservice.py | 37 ------- synapse/storage/appservice.py | 219 +++++++++++---------------------------- tests/storage/test_appservice.py | 39 ------- 3 files changed, 59 insertions(+), 236 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 58b5b60bb7..59cf15b037 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -16,10 +16,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import Codes, StoreError, SynapseError from synapse.appservice import ApplicationService from synapse.types import UserID -import synapse.util.stringutils as stringutils import logging @@ -49,38 +47,6 @@ class ApplicationServicesHandler(object): self.scheduler = appservice_scheduler self.started_scheduler = False - @defer.inlineCallbacks - def register(self, app_service): - logger.info("Register -> %s", app_service) - # check the token is recognised - try: - stored_service = yield self.store.get_app_service_by_token( - app_service.token - ) - if not stored_service: - raise StoreError(404, "Application service not found") - app_service.id = stored_service.id - except StoreError: - raise SynapseError( - 403, "Unrecognised application services token. " - "Consult the home server admin.", - errcode=Codes.FORBIDDEN - ) - app_service.hs_token = self._generate_hs_token() - - # create a sender for this application service which is used when - # creating rooms, etc.. - account = yield self.hs.get_handlers().registration_handler.register() - app_service.sender = account[0] - - yield self.store.update_app_service(app_service) - defer.returnValue(app_service) - - @defer.inlineCallbacks - def unregister(self, token): - logger.info("Unregister as_token=%s", token) - yield self.store.unregister_app_service(token) - @defer.inlineCallbacks def notify_interested_services(self, event): """Notifies (pushes) all application services interested in this event. @@ -223,6 +189,3 @@ class ApplicationServicesHandler(object): exists = yield self.query_user_exists(user_id) defer.returnValue(exists) defer.returnValue(True) - - def _generate_hs_token(self): - return stringutils.random_string(24) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 93304a745f..fe9372a7c6 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import yaml from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import StoreError from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -27,141 +27,18 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) -def log_failure(failure): - logger.error("Failed to detect application services: %s", failure.value) - logger.error(failure.getTraceback()) - - class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.services_cache = [] - self.cache_defer = self._populate_appservice_cache() - self.cache_defer.addErrback(log_failure) - - @defer.inlineCallbacks - def unregister_app_service(self, token): - """Unregisters this service. - - This removes all AS specific regex and the base URL. The token is the - only thing preserved for future registration attempts. - """ - yield self.cache_defer # make sure the cache is ready - yield self.runInteraction( - "unregister_app_service", - self._unregister_app_service_txn, - token, - ) - # update cache TODO: Should this be in the txn? - for service in self.services_cache: - if service.token == token: - service.url = None - service.namespaces = None - service.hs_token = None - - def _unregister_app_service_txn(self, txn, token): - # kill the url to prevent pushes - txn.execute( - "UPDATE application_services SET url=NULL WHERE token=?", - (token,) - ) - - # cleanup regex - as_id = self._get_as_id_txn(txn, token) - if not as_id: - logger.warning( - "unregister_app_service_txn: Failed to find as_id for token=", - token - ) - return False - - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - return True - - @defer.inlineCallbacks - def update_app_service(self, service): - """Update an application service, clobbering what was previously there. - - Args: - service(ApplicationService): The updated service. - """ - yield self.cache_defer # make sure the cache is ready - - # NB: There is no "insert" since we provide no public-facing API to - # allocate new ASes. It relies on the server admin inserting the AS - # token into the database manually. - - if not service.token or not service.url: - raise StoreError(400, "Token and url must be specified.") - - if not service.hs_token: - raise StoreError(500, "No HS token") - - as_id = yield self.runInteraction( - "update_app_service", - self._update_app_service_txn, - service + self._populate_appservice_cache( + hs.config.app_service_config_files ) - service.id = as_id - # update cache TODO: Should this be in the txn? - for (index, cache_service) in enumerate(self.services_cache): - if service.token == cache_service.token: - self.services_cache[index] = service - logger.info("Updated: %s", service) - return - # new entry - self.services_cache.append(service) - logger.info("Updated(new): %s", service) - - def _update_app_service_txn(self, txn, service): - as_id = self._get_as_id_txn(txn, service.token) - if not as_id: - logger.warning( - "update_app_service_txn: Failed to find as_id for token=", - service.token - ) - return - - txn.execute( - "UPDATE application_services SET url=?, hs_token=?, sender=? " - "WHERE id=?", - (service.url, service.hs_token, service.sender, as_id,) - ) - # cleanup regex - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST): - if ns_str in service.namespaces: - for regex_obj in service.namespaces[ns_str]: - txn.execute( - "INSERT INTO application_services_regex(" - "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, json.dumps(regex_obj)) - ) - return as_id - - def _get_as_id_txn(self, txn, token): - cursor = txn.execute( - "SELECT id FROM application_services WHERE token=?", - (token,) - ) - res = cursor.fetchone() - if res: - return res[0] - - @defer.inlineCallbacks def get_app_services(self): - yield self.cache_defer # make sure the cache is ready - defer.returnValue(self.services_cache) + defer.succeed(self.services_cache) - @defer.inlineCallbacks def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -175,37 +52,24 @@ class ApplicationServiceStore(SQLBaseStore): Returns: synapse.appservice.ApplicationService or None. """ - - yield self.cache_defer # make sure the cache is ready - for service in self.services_cache: if service.sender == user_id: - defer.returnValue(service) + defer.succeed(service) return - defer.returnValue(None) + defer.succeed(None) - @defer.inlineCallbacks - def get_app_service_by_token(self, token, from_cache=True): + def get_app_service_by_token(self, token): """Get the application service with the given appservice token. Args: token (str): The application service token. - from_cache (bool): True to get this service from the cache, False to - check the database. - Raises: - StoreError if there was a problem retrieving this service. + Returns: + synapse.appservice.ApplicationService or None. """ - yield self.cache_defer # make sure the cache is ready - - if from_cache: - for service in self.services_cache: - if service.token == token: - defer.returnValue(service) - return - defer.returnValue(None) - - # TODO: The from_cache=False impl - # TODO: This should be JOINed with the application_services_regex table. + for service in self.services_cache: + if service.token == token: + return defer.succeed(service) + defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -336,18 +200,53 @@ class ApplicationServiceStore(SQLBaseStore): )) return service_list - @defer.inlineCallbacks - def _populate_appservice_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " - "application_services_regex AS r ON a.id = r.as_id") - - results = yield self._execute_and_decode("appservice_cache", sql) - services = self._parse_services_dict(results) + def _load_appservice(self, as_info): + required_string_fields = ["url", "as_token", "hs_token", "sender"] + for field in required_string_fields: + if not isinstance(as_info.get(field), basestring): + raise KeyError("Required string field: '%s'", field) + + # namespace checks + if not isinstance(as_info.get("namespaces"), dict): + raise KeyError("Requires 'namespaces' object.") + for ns in ApplicationService.NS_LIST: + # specific namespaces are optional + if ns in as_info["namespaces"]: + # expect a list of dicts with exclusive and regex keys + for regex_obj in as_info["namespaces"][ns]: + if not isinstance(regex_obj, dict): + raise ValueError( + "Expected namespace entry in %s to be an object," + " but got %s", ns, regex_obj + ) + if not isinstance(regex_obj.get("regex"), basestring): + raise ValueError( + "Missing/bad type 'regex' key in %s", regex_obj + ) + if not isinstance(regex_obj.get("exclusive"), bool): + raise ValueError( + "Missing/bad type 'exclusive' key in %s", regex_obj + ) + return ApplicationService( + token=as_info["as_token"], + url=as_info["url"], + namespaces=as_info["namespaces"], + hs_token=as_info["hs_token"], + sender=as_info["sender"] + ) - for service in services: - logger.info("Found application service: %s", service) - self.services_cache.append(service) + def _populate_appservice_cache(self, config_files): + """Populates a cache of Application Services from the config files.""" + for config_file in config_files: + try: + with open(config_file, 'r') as f: + as_info = yaml.load(f) + appservice = self._load_appservice(as_info) + logger.info("Loaded application service: %s", appservice) + self.services_cache.append(appservice) + except Exception as e: + logger.error("Failed to load appservice from '%s'", config_file) + logger.exception(e) class ApplicationServiceTransactionStore(SQLBaseStore): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index e79599f7fb..82bfea15a6 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -49,45 +49,6 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): # must be done after inserts self.store = ApplicationServiceStore(hs) - @defer.inlineCallbacks - def test_update_and_retrieval_of_service(self): - url = "https://matrix.org/appservices/foobar" - hs_token = "hstok" - user_regex = [ - {"regex": "@foobar_.*:matrix.org", "exclusive": True} - ] - alias_regex = [ - {"regex": "#foobar_.*:matrix.org", "exclusive": False} - ] - room_regex = [ - - ] - service = ApplicationService( - url=url, hs_token=hs_token, token=self.as_token, namespaces={ - ApplicationService.NS_USERS: user_regex, - ApplicationService.NS_ALIASES: alias_regex, - ApplicationService.NS_ROOMS: room_regex - }) - yield self.store.update_app_service(service) - - stored_service = yield self.store.get_app_service_by_token( - self.as_token - ) - self.assertEquals(stored_service.token, self.as_token) - self.assertEquals(stored_service.url, url) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_ALIASES], - alias_regex - ) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_ROOMS], - room_regex - ) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_USERS], - user_regex - ) - @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): service = yield self.store.get_app_service_by_token("invalid_token") -- cgit 1.5.1 From b59aa745560608c8503421bd9542c99fc1c571b5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 11:35:45 +0100 Subject: Fix tests and missing returns on deferreds. --- synapse/appservice/__init__.py | 2 +- synapse/storage/appservice.py | 18 +++++++++++------- tests/storage/test_appservice.py | 41 ++++++++++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 20 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index c60db16b74..a8108c1efb 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -95,7 +95,7 @@ class ApplicationService(object): # rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...], # } if not namespaces: - return None + namespaces = {} for ns in ApplicationService.NS_LIST: if ns not in namespaces: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe9372a7c6..a520a859d3 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): ) def get_app_services(self): - defer.succeed(self.services_cache) + return defer.succeed(self.services_cache) def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -54,9 +54,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.sender == user_id: - defer.succeed(service) - return - defer.succeed(None) + return defer.succeed(service) + return defer.succeed(None) def get_app_service_by_token(self, token): """Get the application service with the given appservice token. @@ -69,7 +68,7 @@ class ApplicationServiceStore(SQLBaseStore): for service in self.services_cache: if service.token == token: return defer.succeed(service) - defer.succeed(None) + return defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -237,11 +236,16 @@ class ApplicationServiceStore(SQLBaseStore): def _populate_appservice_cache(self, config_files): """Populates a cache of Application Services from the config files.""" + if not isinstance(config_files, list): + logger.warning( + "Expected %s to be a list of AS config files.", config_files + ) + return + for config_file in config_files: try: with open(config_file, 'r') as f: - as_info = yaml.load(f) - appservice = self._load_appservice(as_info) + appservice = self._load_appservice(yaml.load(f)) logger.info("Loaded application service: %s", appservice) self.services_cache.append(appservice) except Exception as e: diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 82bfea15a6..b856438fd2 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -22,6 +22,8 @@ from synapse.storage.appservice import ( ) import json +import os +import yaml from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -30,25 +32,40 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + self.as_yaml_files = [] db_pool = SQLiteMemoryDbPool() yield db_pool.prepare() hs = HomeServer( - "test", db_pool=db_pool, clock=MockClock(), config=Mock() + "test", db_pool=db_pool, clock=MockClock(), + config=Mock( + app_service_config_files=self.as_yaml_files + ) ) + self.as_token = "token1" - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", - (self.as_token,) - ) - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", ("token2",) - ) - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", ("token3",) - ) + self.as_url = "some_url" + self._add_appservice(self.as_token, self.as_url, "some_hs_token", "bob") + self._add_appservice("token2", "some_url", "some_hs_token", "bob") + self._add_appservice("token3", "some_url", "some_hs_token", "bob") # must be done after inserts self.store = ApplicationServiceStore(hs) + def tearDown(self): + # TODO: suboptimal that we need to create files for tests! + for f in self.as_yaml_files: + try: + os.remove(f) + except: + pass + + def _add_appservice(self, as_token, url, hs_token, sender): + as_yaml = dict(url=url, as_token=as_token, hs_token=hs_token, + sender=sender, namespaces={}) + # use the token as the filename + with open(as_token, 'w') as outfile: + outfile.write(yaml.dump(as_yaml)) + self.as_yaml_files.append(as_token) + @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): service = yield self.store.get_app_service_by_token("invalid_token") @@ -60,7 +77,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self.as_token ) self.assertEquals(stored_service.token, self.as_token) - self.assertEquals(stored_service.url, None) + self.assertEquals(stored_service.url, self.as_url) self.assertEquals( stored_service.namespaces[ApplicationService.NS_ALIASES], [] -- cgit 1.5.1 From c217504949a90712f41a0422215f923b4d114a17 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 12:07:56 +0100 Subject: Edit SQL schema to use string IDs not ints. Use token as ID. Update tests. --- synapse/storage/appservice.py | 23 ++++++++----- .../storage/schema/delta/15/appservice_txns.sql | 7 ++-- tests/storage/test_appservice.py | 38 +++++++++++++--------- 3 files changed, 40 insertions(+), 28 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a520a859d3..a8780eca1e 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -231,7 +231,8 @@ class ApplicationServiceStore(SQLBaseStore): url=as_info["url"], namespaces=as_info["namespaces"], hs_token=as_info["hs_token"], - sender=as_info["sender"] + sender=as_info["sender"], + id=as_info["as_token"] # the token is the only unique thing here ) def _populate_appservice_cache(self, config_files): @@ -268,16 +269,20 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to a list of ApplicationServices, which may be empty. """ - sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" - " application_services AS a ON a.id=s.as_id LEFT JOIN" - " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" - ) - results = yield self._execute_and_decode( - "get_appservices_by_state", sql, state + results = yield self._simple_select_list( + "application_services_state", + dict(state=state), + ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore - defer.returnValue(self._parse_services_dict(results)) + as_list = yield self.get_app_services() + services = [] + + for res in results: + for service in as_list: + if service.id == res["as_id"]: + services.append(service) + defer.returnValue(services) @defer.inlineCallbacks def get_appservice_state(self, service): diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 13bbb2de2e..2b27e2a429 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -14,14 +14,13 @@ */ CREATE TABLE IF NOT EXISTS application_services_state( - as_id INTEGER PRIMARY KEY, + as_id TEXT PRIMARY KEY, state TEXT, - last_txn TEXT, - FOREIGN KEY(as_id) REFERENCES application_services(id) + last_txn TEXT ); CREATE TABLE IF NOT EXISTS application_services_txns( - as_id INTEGER NOT NULL, + as_id TEXT NOT NULL, txn_id INTEGER NOT NULL, event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index b856438fd2..58551e40b9 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -101,42 +101,48 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + self.as_yaml_files = [] self.db_pool = SQLiteMemoryDbPool() yield self.db_pool.prepare() - hs = HomeServer( - "test", db_pool=self.db_pool, clock=MockClock(), config=Mock() - ) self.as_list = [ { "token": "token1", "url": "https://matrix-as.org", - "id": 3 + "id": "token1" }, { "token": "alpha_tok", "url": "https://alpha.com", - "id": 5 + "id": "alpha_tok" }, { "token": "beta_tok", "url": "https://beta.com", - "id": 6 + "id": "beta_tok" }, { "token": "delta_tok", "url": "https://delta.com", - "id": 7 + "id": "delta_tok" }, ] for s in self.as_list: - yield self._add_service(s["id"], s["url"], s["token"]) - self.store = TestTransactionStore(hs) + yield self._add_service(s["url"], s["token"]) - def _add_service(self, as_id, url, token): - return self.db_pool.runQuery( - "INSERT INTO application_services(id, url, token) VALUES(?,?,?)", - (as_id, url, token) + hs = HomeServer( + "test", db_pool=self.db_pool, clock=MockClock(), config=Mock( + app_service_config_files=self.as_yaml_files + ) ) + self.store = TestTransactionStore(hs) + + def _add_service(self, url, as_token): + as_yaml = dict(url=url, as_token=as_token, hs_token="something", + sender="a_sender", namespaces={}) + # use the token as the filename + with open(as_token, 'w') as outfile: + outfile.write(yaml.dump(as_yaml)) + self.as_yaml_files.append(as_token) def _set_state(self, id, state, txn=None): return self.db_pool.runQuery( @@ -388,8 +394,10 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ApplicationServiceState.DOWN ) self.assertEquals(2, len(services)) - self.assertEquals(self.as_list[2]["id"], services[0].id) - self.assertEquals(self.as_list[0]["id"], services[1].id) + self.assertEquals( + set([self.as_list[2]["id"], self.as_list[0]["id"]]), + set([services[0].id, services[1].id]) + ) # required for ApplicationServiceTransactionStoreTestCase tests -- cgit 1.5.1 From 3470cb36a81052d4968d109f99ecbad210b0c820 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 13:03:31 +0100 Subject: Pyflakes --- synapse/app/homeserver.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 29ca720d5e..afb46d2e23 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -38,8 +38,7 @@ from synapse.http.server_key_resource import LocalKey from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX, - STATIC_PREFIX + SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, STATIC_PREFIX ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -- cgit 1.5.1 From cf1fa59f4b72dbf5c9d735eaf051f1456721d91f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 13:48:03 +0100 Subject: Use a sender localpart instead of a user ID. Form the user ID at runtime instead, This gives less room for error in AS config files since they cannot specify the domain of another HS. --- synapse/storage/appservice.py | 11 +++++++++-- tests/storage/test_appservice.py | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a8780eca1e..557e377ca5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser +from synapse.types import UserID from ._base import SQLBaseStore @@ -31,6 +32,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) + self.hostname = hs.hostname self.services_cache = [] self._populate_appservice_cache( hs.config.app_service_config_files @@ -200,11 +202,16 @@ class ApplicationServiceStore(SQLBaseStore): return service_list def _load_appservice(self, as_info): - required_string_fields = ["url", "as_token", "hs_token", "sender"] + required_string_fields = [ + "url", "as_token", "hs_token", "sender_localpart" + ] for field in required_string_fields: if not isinstance(as_info.get(field), basestring): raise KeyError("Required string field: '%s'", field) + user = UserID(as_info["sender_localpart"], self.hostname) + user_id = user.to_string() + # namespace checks if not isinstance(as_info.get("namespaces"), dict): raise KeyError("Requires 'namespaces' object.") @@ -231,7 +238,7 @@ class ApplicationServiceStore(SQLBaseStore): url=as_info["url"], namespaces=as_info["namespaces"], hs_token=as_info["hs_token"], - sender=as_info["sender"], + sender=user_id, id=as_info["as_token"] # the token is the only unique thing here ) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 58551e40b9..675959c56c 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -60,7 +60,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): def _add_appservice(self, as_token, url, hs_token, sender): as_yaml = dict(url=url, as_token=as_token, hs_token=hs_token, - sender=sender, namespaces={}) + sender_localpart=sender, namespaces={}) # use the token as the filename with open(as_token, 'w') as outfile: outfile.write(yaml.dump(as_yaml)) @@ -138,7 +138,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): def _add_service(self, url, as_token): as_yaml = dict(url=url, as_token=as_token, hs_token="something", - sender="a_sender", namespaces={}) + sender_localpart="a_sender", namespaces={}) # use the token as the filename with open(as_token, 'w') as outfile: outfile.write(yaml.dump(as_yaml)) -- cgit 1.5.1 From 5e88a09a424b8ce65bfe9a809cfd245286474de3 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 14:00:25 +0100 Subject: Add same user_id char checks as registration. --- synapse/storage/appservice.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 557e377ca5..f8cbb3f323 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import urllib import yaml from simplejson import JSONDecodeError import simplejson as json @@ -209,7 +210,12 @@ class ApplicationServiceStore(SQLBaseStore): if not isinstance(as_info.get(field), basestring): raise KeyError("Required string field: '%s'", field) - user = UserID(as_info["sender_localpart"], self.hostname) + localpart = as_info["sender_localpart"] + if urllib.quote(localpart) != localpart: + raise ValueError( + "sender_localpart needs characters which are not URL encoded." + ) + user = UserID(localpart, self.hostname) user_id = user.to_string() # namespace checks -- cgit 1.5.1 From 09cbff174a01757d10107b7960972a484153323e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 16:43:49 +0100 Subject: Fix thinko whereby events *for the AS specifically* were not passed on. This was caused by not explicitly checking the service.sender field. This has now been fixed and a regression test has been added. --- synapse/appservice/__init__.py | 5 ++++- tests/appservice/test_appservice.py | 13 +++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index c60db16b74..4a6cdbc2be 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -199,7 +199,10 @@ class ApplicationService(object): return self._matches_user(event, member_list) def is_interested_in_user(self, user_id): - return self._matches_regex(user_id, ApplicationService.NS_USERS) + return ( + self._matches_regex(user_id, ApplicationService.NS_USERS) + or user_id == self.sender + ) def is_interested_in_alias(self, alias): return self._matches_regex(alias, ApplicationService.NS_ALIASES) diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index eb7becf725..62149d6902 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -199,6 +199,19 @@ class ApplicationServiceTestCase(unittest.TestCase): aliases_for_event=["#xmpp_barfoo:matrix.org"] )) + def test_interested_in_self(self): + # make sure invites get through + self.service.sender = "@appservice:name" + self.service.namespaces[ApplicationService.NS_USERS].append( + _regex("@irc_.*") + ) + self.event.type = "m.room.member" + self.event.content = { + "membership": "invite" + } + self.event.state_key = self.service.sender + self.assertTrue(self.service.is_interested(self.event)) + def test_member_list_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") -- cgit 1.5.1 From 813e54bd5b332e4514ecfea71d33d27f106fe5ff Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 1 Apr 2015 14:05:24 +0100 Subject: Fix more AS sender ID thinkos. Specifically, the ASes own user ID wasn't being treated as 'exclusive' so a human could nab it. Also, the HS would needlessly send user queries to the AS for its own user ID. --- synapse/appservice/__init__.py | 5 ++++- synapse/handlers/appservice.py | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index ab0a6955f0..63a18b802b 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -211,7 +211,10 @@ class ApplicationService(object): return self._matches_regex(room_id, ApplicationService.NS_ROOMS) def is_exclusive_user(self, user_id): - return self._is_exclusive(ApplicationService.NS_USERS, user_id) + return ( + self._is_exclusive(ApplicationService.NS_USERS, user_id) + or user_id == self.sender + ) def is_exclusive_alias(self, alias): return self._is_exclusive(ApplicationService.NS_ALIASES, alias) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 59cf15b037..492a630fdc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -180,7 +180,14 @@ class ApplicationServicesHandler(object): return user_info = yield self.store.get_user_by_id(user_id) - defer.returnValue(len(user_info) == 0) + if len(user_info) > 0: + defer.returnValue(False) + return + + # user not found; could be the AS though, so check. + services = yield self.store.get_app_services() + service_list = [s for s in services if s.sender == user_id] + defer.returnValue(len(service_list) == 0) @defer.inlineCallbacks def _check_user_exists(self, user_id): -- cgit 1.5.1 From 5583e29513f1f67012b98b430670d645928d4195 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 1 Apr 2015 19:04:55 +0100 Subject: Report process open filehandles in metrics --- synapse/metrics/__init__.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) (limited to 'synapse') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index dffb8a4861..6564b03eee 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,6 +18,8 @@ from __future__ import absolute_import import logging from resource import getrusage, getpagesize, RUSAGE_SELF +import os +import stat from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric @@ -109,3 +111,35 @@ resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000) # pages resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE) + +TYPES = { + stat.S_IFSOCK: "SOCK", + stat.S_IFLNK: "LNK", + stat.S_IFREG: "REG", + stat.S_IFBLK: "BLK", + stat.S_IFDIR: "DIR", + stat.S_IFCHR: "CHR", + stat.S_IFIFO: "FIFO", +} + +def _process_fds(): + counts = {(k,): 0 for k in TYPES.values()} + counts[("other",)] = 0 + + for fd in os.listdir("/proc/self/fd"): + try: + s = os.stat("/proc/self/fd/%s" % (fd)) + fmt = stat.S_IFMT(s.st_mode) + if fmt in TYPES: + t = TYPES[fmt] + else: + t = "other" + + counts[(t,)] += 1 + except OSError: + # the dirh itself used by listdir() is usually missing by now + pass + + return counts + +get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"]) -- cgit 1.5.1 From ef1e019840ee8ba17b45754ef223a710ce23553c Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 1 Apr 2015 19:17:38 +0100 Subject: Appease pep8 --- synapse/metrics/__init__.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 6564b03eee..9233ea3da9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -122,6 +122,7 @@ TYPES = { stat.S_IFIFO: "FIFO", } + def _process_fds(): counts = {(k,): 0 for k in TYPES.values()} counts[("other",)] = 0 -- cgit 1.5.1