From ef995e69460a117e78a72bcef285f9a0c7438487 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Feb 2015 14:47:59 +0000 Subject: Add looping_call to Clock --- synapse/util/__init__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 4e837a918e..fee76b0a9b 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -15,7 +15,7 @@ from synapse.util.logcontext import LoggingContext -from twisted.internet import reactor +from twisted.internet import reactor, task import time @@ -35,6 +35,14 @@ class Clock(object): """Returns the current system time in miliseconds since epoch.""" return self.time() * 1000 + def looping_call(self, f, msec): + l = task.LoopingCall(f) + l.start(msec/1000.0, now=False) + return l + + def stop_looping_call(self, loop): + loop.stop() + def call_later(self, delay, callback): current_context = LoggingContext.current_context() -- cgit 1.4.1 From 0e6b3e4e40ae918eacdef87bb50ff1d19b304e7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Feb 2015 18:17:27 +0000 Subject: Time out HTTP federation requests --- synapse/http/matrixfederationclient.py | 11 +++++++++-- synapse/util/async.py | 20 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index aa14782b0f..fdc1e2678e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.agent_name import AGENT_NAME from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.async import sleep +from synapse.util.async import sleep, time_bound_deferred from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json @@ -78,6 +78,7 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.agent = MatrixFederationHttpAgent(reactor) + self.clock = hs.get_clock() @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, @@ -117,7 +118,7 @@ class MatrixFederationHttpClient(object): try: with PreserveLoggingContext(): - response = yield self.agent.request( + request_deferred = self.agent.request( destination, endpoint, method, @@ -128,6 +129,12 @@ class MatrixFederationHttpClient(object): producer ) + response = yield time_bound_deferred( + request_deferred, + clock=self.clock, + time_out=60, + ) + logger.debug("Got response to %s", method) break except Exception as e: diff --git a/synapse/util/async.py b/synapse/util/async.py index c4fe5d522f..d4d1d4b472 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -32,3 +32,23 @@ def run_on_reactor(): iteration of the main loop """ return sleep(0) + + +def time_bound_deferred(given_deferred, clock, time_out): + ret_deferred = defer.Deferred() + + def timed_out(): + if not given_deferred.called: + given_deferred.cancel() + ret_deferred.errback(RuntimeError("Timed out")) + + timer = clock.call_later(time_out, timed_out) + + def succeed(result): + clock.cancel_call_later(timer) + ret_deferred.callback(result) + + given_deferred.addCallback(succeed) + given_deferred.addErrback(ret_deferred.errback) + + return ret_deferred -- cgit 1.4.1 From dcf52469e821b3b2b69a0610c2c4f025a5aac68f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 10:25:06 +0000 Subject: Move time_bound_deferred into Clock --- synapse/http/matrixfederationclient.py | 5 ++--- synapse/util/__init__.py | 21 ++++++++++++++++++++- synapse/util/async.py | 20 -------------------- 3 files changed, 22 insertions(+), 24 deletions(-) (limited to 'synapse/util') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index fdc1e2678e..74e523960f 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.agent_name import AGENT_NAME from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.async import sleep, time_bound_deferred +from synapse.util.async import sleep from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json @@ -129,9 +129,8 @@ class MatrixFederationHttpClient(object): producer ) - response = yield time_bound_deferred( + response = yield self.clock.time_bound_deferred( request_deferred, - clock=self.clock, time_out=60, ) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 4e837a918e..2da8dfa719 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -15,7 +15,7 @@ from synapse.util.logcontext import LoggingContext -from twisted.internet import reactor +from twisted.internet import defer, reactor import time @@ -45,3 +45,22 @@ class Clock(object): def cancel_call_later(self, timer): timer.cancel() + + def time_bound_deferred(self, given_deferred, time_out): + ret_deferred = defer.Deferred() + + def timed_out(): + if not given_deferred.called: + given_deferred.cancel() + ret_deferred.errback(RuntimeError("Timed out")) + + timer = self.call_later(time_out, timed_out) + + def succeed(result): + self.cancel_call_later(timer) + ret_deferred.callback(result) + + given_deferred.addCallback(succeed) + given_deferred.addErrback(ret_deferred.errback) + + return ret_deferred diff --git a/synapse/util/async.py b/synapse/util/async.py index d4d1d4b472..c4fe5d522f 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -32,23 +32,3 @@ def run_on_reactor(): iteration of the main loop """ return sleep(0) - - -def time_bound_deferred(given_deferred, clock, time_out): - ret_deferred = defer.Deferred() - - def timed_out(): - if not given_deferred.called: - given_deferred.cancel() - ret_deferred.errback(RuntimeError("Timed out")) - - timer = clock.call_later(time_out, timed_out) - - def succeed(result): - clock.cancel_call_later(timer) - ret_deferred.callback(result) - - given_deferred.addCallback(succeed) - given_deferred.addErrback(ret_deferred.errback) - - return ret_deferred -- cgit 1.4.1 From d8324d5a2b9ffc6f3a426ecd240f6c2460630025 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 11 Feb 2015 14:52:23 +0000 Subject: Add a lru cache class --- synapse/util/lrucache.py | 110 ++++++++++++++++++++++++++++++++++++++++++++ tests/util/test_lrucache.py | 56 ++++++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 synapse/util/lrucache.py create mode 100644 tests/util/test_lrucache.py (limited to 'synapse/util') diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py new file mode 100644 index 0000000000..a45c673d32 --- /dev/null +++ b/synapse/util/lrucache.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class LruCache(object): + """Least-recently-used cache.""" + # TODO(mjark) Add hit/miss counters + # TODO(mjark) Add mutex for linked list for thread safety. + def __init__(self, max_size): + cache = {} + list_root = [] + list_root[:] = [list_root, list_root, None, None] + + PREV, NEXT, KEY, VALUE = 0, 1, 2, 3 + + def add_node(key, value): + prev_node = list_root + next_node = prev_node[NEXT] + node = [prev_node, next_node, key, value] + prev_node[NEXT] = node + next_node[PREV] = node + cache[key] = node + + def move_node_to_front(node): + prev_node = node[PREV] + next_node = node[NEXT] + prev_node[NEXT] = next_node + next_node[PREV] = prev_node + prev_node = list_root + next_node = prev_node[NEXT] + node[PREV] = prev_node + node[NEXT] = next_node + prev_node[NEXT] = node + next_node[PREV] = node + + def delete_node(node): + prev_node = node[PREV] + next_node = node[NEXT] + prev_node[NEXT] = next_node + next_node[PREV] = prev_node + cache.pop(node[KEY], None) + + def cache_get(key, default=None): + node = cache.get(key, None) + if node is not None: + move_node_to_front(node) + return node[VALUE] + else: + return default + + def cache_set(key, value): + node = cache.get(key, None) + if node is not None: + move_node_to_front(node) + node[VALUE] = value + else: + add_node(key, value) + if len(cache) > max_size: + delete_node(list_root[PREV]) + + def cache_set_default(key, value): + node = cache.get(key, None) + if node is not None: + return node[VALUE] + else: + add_node(key, value) + if len(cache) > max_size: + delete_node(list_root[PREV]) + return value + + def cache_pop(key, default=None): + node = cache.get(key, None) + if node: + delete_node(node) + return node[VALUE] + else: + return default + + self.sentinel = object() + self.get = cache_get + self.set = cache_set + self.setdefault = cache_set_default + self.pop = cache_pop + + def __getitem__(self, key): + result = self.get(key, self.sentinel) + if result is self.sentinel: + raise KeyError() + else: + return result + + def __setitem__(self, key, value): + self.set(key, value) + + def __delitem__(self, key, value): + result = self.pop(key, self.sentinel) + if result is self.sentinel: + raise KeyError() diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py new file mode 100644 index 0000000000..ab934bf928 --- /dev/null +++ b/tests/util/test_lrucache.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .. import unittest + +from synapse.util.lrucache import LruCache + +class LruCacheTestCase(unittest.TestCase): + + def test_get_set(self): + cache = LruCache(1) + cache["key"] = "value" + self.assertEquals(cache.get("key"), "value") + self.assertEquals(cache["key"], "value") + + def test_eviction(self): + cache = LruCache(2) + cache[1] = 1 + cache[2] = 2 + + self.assertEquals(cache.get(1), 1) + self.assertEquals(cache.get(2), 2) + + cache[3] = 3 + + self.assertEquals(cache.get(1), None) + self.assertEquals(cache.get(2), 2) + self.assertEquals(cache.get(3), 3) + + def test_setdefault(self): + cache = LruCache(1) + self.assertEquals(cache.setdefault("key", 1), 1) + self.assertEquals(cache.get("key"), 1) + self.assertEquals(cache.setdefault("key", 2), 1) + self.assertEquals(cache.get("key"), 1) + + def test_pop(self): + cache = LruCache(1) + cache["key"] = 1 + self.assertEquals(cache.pop("key"), 1) + self.assertEquals(cache.pop("key"), None) + + -- cgit 1.4.1 From ddb816cf60ca1b0c0f9bfab5df233a010ac309a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 15:44:28 +0000 Subject: Don't unfreeze when using FreezeEvent.get_dict, as we are using a JSONEncoder that understands FrozenDict --- synapse/events/__init__.py | 4 ---- synapse/federation/persistence.py | 7 ++++--- synapse/handlers/federation.py | 15 ++++++++++++++- synapse/handlers/room.py | 4 +++- synapse/storage/__init__.py | 11 +++++++++-- synapse/util/frozenutils.py | 8 ++++++-- 6 files changed, 36 insertions(+), 13 deletions(-) (limited to 'synapse/util') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 8f0c6e959f..f4ec8cd18c 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -140,10 +140,6 @@ class FrozenEvent(EventBase): return e - def get_dict(self): - # We need to unfreeze what we return - return unfreeze(super(FrozenEvent, self).get_dict()) - def __str__(self): return self.__repr__() diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 8a1afc0ca5..76a9dcd777 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,7 +23,8 @@ from twisted.internet import defer from synapse.util.logutils import log_function -import simplejson as json +from syutil.jsonutil import encode_canonical_json + import logging @@ -70,7 +71,7 @@ class TransactionActions(object): transaction.transaction_id, transaction.origin, code, - json.dumps(response) + encode_canonical_json(response) ) @defer.inlineCallbacks @@ -100,5 +101,5 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, response_code, - json.dumps(response_dict) + encode_canonical_json(response_dict) ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0f9c82fd06..e36f0945ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -23,6 +23,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor +from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, ) @@ -311,9 +312,12 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] builder = self.event_builder_factory.new( - event.get_pdu_json() + unfreeze(event.get_pdu_json()) ) + logger.info("Builder: %s", builder.get_pdu_json()) + logger.info("Content: %s", content) + handled_events = set() try: @@ -324,14 +328,21 @@ class FederationHandler(BaseHandler): if not hasattr(event, "signatures"): builder.signatures = {} + logger.info("Content befhahs: %s", builder.content) + add_hashes_and_signatures( builder, self.hs.hostname, self.hs.config.signing_key[0], ) + logger.info("Content aftet hah: %s", builder.content) + logger.info("Content pdu json: %s", builder.get_pdu_json()) + new_event = builder.build() + logger.info("Content after build: %s", new_event.content) + # Try the host we successfully got a response to /make_join/ # request first. try: @@ -340,6 +351,7 @@ class FederationHandler(BaseHandler): except ValueError: pass + logger.info(new_event.content) ret = yield self.replication_layer.send_join( target_hosts, new_event @@ -485,6 +497,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False + logger.info(event.content) context = yield self._handle_new_event(origin, event) logger.debug( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0369b907a5..c685991a9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -403,7 +403,9 @@ class RoomMemberHandler(BaseHandler): "membership": Membership.JOIN, "content": content, }) + logger.info("Before build: %s", builder.get_pdu_json()) event, context = yield self._create_new_client_event(builder) + logger.info("After build: %s", event.get_dict()) yield self._do_join(event, context, room_hosts=hosts, do_auth=True) @@ -462,7 +464,7 @@ class RoomMemberHandler(BaseHandler): room_hosts, room_id, event.user_id, - event.get_dict()["content"], # FIXME To get a non-frozen dict + event.content, # FIXME To get a non-frozen dict context ) else: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 924ea89035..94603ef826 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -298,12 +298,16 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) + content = encode_canonical_json( + event.content + ).decode("UTF-8") + vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.get_dict()["content"]), + "content": content, "processed": True, "outlier": outlier, "depth": event.depth, @@ -323,7 +327,10 @@ class DataStore(RoomMemberStore, RoomStore, "prev_events", ] } - vals["unrecognized_keys"] = json.dumps(unrec) + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") try: self._simple_insert_txn( diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index a13a2015e4..9e10d37aec 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -21,6 +21,9 @@ def freeze(o): if t is dict: return frozendict({k: freeze(v) for k, v in o.items()}) + if t is frozendict: + return o + if t is str or t is unicode: return o @@ -33,10 +36,11 @@ def freeze(o): def unfreeze(o): - if isinstance(o, frozendict) or isinstance(o, dict): + t = type(o) + if t is dict or t is frozendict: return dict({k: unfreeze(v) for k, v in o.items()}) - if isinstance(o, basestring): + if t is str or t is unicode: return o try: -- cgit 1.4.1 From ef276e8770e19c66d14462aa325b9cb241121bb6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 16:48:05 +0000 Subject: Fix so timing out connections to actually work. --- synapse/federation/replication.py | 2 ++ synapse/util/__init__.py | 52 ++++++++++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 9 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e442c6c5d5..54a0c7ad8e 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer): self._order = 0 + self.hs = hs + def __str__(self): return "" % self.server_name diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 29f1344c5b..e77eba90ad 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -18,6 +18,9 @@ from synapse.util.logcontext import LoggingContext from twisted.internet import defer, reactor, task import time +import logging + +logger = logging.getLogger(__name__) class Clock(object): @@ -55,20 +58,51 @@ class Clock(object): timer.cancel() def time_bound_deferred(self, given_deferred, time_out): + if given_deferred.called: + return given_deferred + ret_deferred = defer.Deferred() - def timed_out(): - if not given_deferred.called: - given_deferred.cancel() + def timed_out_fn(): + try: ret_deferred.errback(RuntimeError("Timed out")) + except: + pass + + try: + given_deferred.cancel() + except: + pass + + timer = None + + def cancel(res): + try: + self.cancel_call_later(timer) + except: + pass + return res + + ret_deferred.addBoth(cancel) + + def sucess(res): + try: + ret_deferred.callback(res) + except: + pass + + return res + + def err(res): + try: + ret_deferred.errback(res) + except: + pass - timer = self.call_later(time_out, timed_out) + return res - def succeed(result): - self.cancel_call_later(timer) - ret_deferred.callback(result) + given_deferred.addCallbacks(callback=sucess, errback=err) - given_deferred.addCallback(succeed) - given_deferred.addErrback(ret_deferred.errback) + timer = self.call_later(time_out, timed_out_fn) return ret_deferred -- cgit 1.4.1