diff options
Diffstat (limited to 'synapse')
29 files changed, 194 insertions, 56 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index ff6680e7da..f5681fac20 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -274,6 +274,7 @@ def setup(): hs.get_pusherpool().start() + hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() if config.daemonize: diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index cd12349f67..74008347c3 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -19,7 +19,7 @@ from twisted.internet.protocol import Factory from twisted.internet import defer, reactor from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.logcontext import PreserveLoggingContext -import json +import simplejson as json import logging diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 8f0c6e959f..64e08223b0 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.frozenutils import freeze, unfreeze +from synapse.util.frozenutils import freeze class _EventInternalMetadata(object): @@ -140,10 +140,6 @@ class FrozenEvent(EventBase): return e - def get_dict(self): - # We need to unfreeze what we return - return unfreeze(super(FrozenEvent, self).get_dict()) - def __str__(self): return self.__repr__() diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 85c82a4623..76a9dcd777 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,7 +23,8 @@ from twisted.internet import defer from synapse.util.logutils import log_function -import json +from syutil.jsonutil import encode_canonical_json + import logging @@ -70,7 +71,7 @@ class TransactionActions(object): transaction.transaction_id, transaction.origin, code, - json.dumps(response) + encode_canonical_json(response) ) @defer.inlineCallbacks @@ -100,5 +101,5 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, response_code, - json.dumps(response_dict) + encode_canonical_json(response_dict) ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 9c9f8d525b..2ffb37aa18 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -20,7 +20,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.util.logutils import log_function import logging -import json +import simplejson as json import re diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0f9c82fd06..77c81fe2da 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -23,6 +23,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor +from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, ) @@ -311,7 +312,7 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] builder = self.event_builder_factory.new( - event.get_pdu_json() + unfreeze(event.get_pdu_json()) ) handled_events = set() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0369b907a5..914742d913 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -462,7 +462,7 @@ class RoomMemberHandler(BaseHandler): room_hosts, room_id, event.user_id, - event.get_dict()["content"], # FIXME To get a non-frozen dict + event.content, # FIXME To get a non-frozen dict context ) else: diff --git a/synapse/http/client.py b/synapse/http/client.py index 198f575cfa..d500e19c81 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -23,7 +23,7 @@ from twisted.web.http_headers import Headers from StringIO import StringIO -import json +import simplejson as json import logging import urllib diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 96f4250167..1927948001 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -33,7 +33,7 @@ from synapse.api.errors import ( from syutil.crypto.jsonsign import sign_json -import json +import simplejson as json import logging import urllib import urlparse diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 418a348a58..0659a1cb9b 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -22,7 +22,7 @@ import synapse.util.async import baserules import logging -import json +import simplejson as json import re logger = logging.getLogger(__name__) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 5a525befd7..7483d257bf 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -20,7 +20,7 @@ from httppusher import HttpPusher from synapse.push import PusherConfigException import logging -import json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e2a9d1f6a7..8164effeee 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -4,7 +4,7 @@ from distutils.version import LooseVersion logger = logging.getLogger(__name__) REQUIREMENTS = { - "syutil==0.0.2": ["syutil"], + "syutil>=0.0.3": ["syutil"], "matrix_angular_sdk>=0.6.1": ["syweb>=0.6.1"], "Twisted==14.0.2": ["twisted==14.0.2"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 8ed7e2d669..420aa89f38 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -20,7 +20,7 @@ from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import RoomAlias from .base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json import logging diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 7116ac98e8..b2257b749d 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError from synapse.types import UserID from base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json class LoginRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 7feb4aadb1..78d4f2b128 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -21,7 +21,7 @@ from synapse.api.errors import SynapseError from synapse.types import UserID from .base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json import logging logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 15d6f3fc6c..1e77eb49cf 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -19,7 +19,7 @@ from twisted.internet import defer from .base import ClientV1RestServlet, client_path_pattern from synapse.types import UserID -import json +import simplejson as json class ProfileDisplaynameRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index c4e7dfcf0e..b012f31084 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -27,7 +27,7 @@ from synapse.push.rulekinds import ( PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP ) -import json +import simplejson as json class PushRuleRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 80e9939b79..6045e86f34 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, Codes from synapse.push import PusherConfigException from .base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json class PusherRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index c0423c2d45..d3399c446b 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -25,7 +25,7 @@ from synapse.util.async import run_on_reactor from hashlib import sha1 import hmac -import json +import simplejson as json import logging import urllib diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 410f19ccf6..0346afb1b4 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.types import UserID, RoomID, RoomAlias from synapse.events.utils import serialize_event -import json +import simplejson as json import logging import urllib diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index 6ddc495d23..703250cea8 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -21,7 +21,7 @@ from synapse.types import UserID from ._base import client_v2_pattern -import json +import simplejson as json import logging diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py index 22e26e3cd5..e77a20fb2e 100644 --- a/synapse/rest/media/v0/content_repository.py +++ b/synapse/rest/media/v0/content_repository.py @@ -25,7 +25,7 @@ from twisted.web import server, resource from twisted.internet import defer import base64 -import json +import simplejson as json import logging import os import re diff --git a/synapse/state.py b/synapse/state.py index 54380b9e5c..98aaa2be53 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -43,14 +43,39 @@ AuthEventTypes = ( ) +SIZE_OF_CACHE = 1000 +EVICTION_TIMEOUT_SECONDS = 20 + + +class _StateCacheEntry(object): + def __init__(self, state, state_group, ts): + self.state = state + self.state_group = state_group + self.ts = ts + + class StateHandler(object): """ Responsible for doing state conflict resolution. """ def __init__(self, hs): + self.clock = hs.get_clock() self.store = hs.get_datastore() self.hs = hs + # dict of set of event_ids -> _StateCacheEntry. + self._state_cache = None + + def start_caching(self): + logger.debug("start_caching") + + self._state_cache = {} + + def f(): + self._prune_cache() + + self.clock.looping_call(f, 5*1000) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): """ Returns the current state for the room as a list. This is done by @@ -70,13 +95,22 @@ class StateHandler(object): for e_id, _, _ in events ] - res = yield self.resolve_state_groups(event_ids) + cache = None + if self._state_cache is not None: + cache = self._state_cache.get(frozenset(event_ids), None) + + if cache: + cache.ts = self.clock.time_msec() + state = cache.state + else: + res = yield self.resolve_state_groups(event_ids) + state = res[1] if event_type: - defer.returnValue(res[1].get((event_type, state_key))) + defer.returnValue(state.get((event_type, state_key))) return - defer.returnValue(res[1]) + defer.returnValue(state) @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): @@ -177,6 +211,20 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) + if self._state_cache is not None: + cache = self._state_cache.get(frozenset(event_ids), None) + if cache and cache.state_group: + cache.ts = self.clock.time_msec() + prev_state = cache.state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + prev_states = [prev_state] + else: + prev_states = [] + defer.returnValue( + (cache.state_group, cache.state, prev_states) + ) + state_groups = yield self.store.get_state_groups( event_ids ) @@ -200,6 +248,15 @@ class StateHandler(object): else: prev_states = [] + if self._state_cache is not None: + cache = _StateCacheEntry( + state=state, + state_group=name, + ts=self.clock.time_msec() + ) + + self._state_cache[frozenset(event_ids)] = cache + defer.returnValue((name, state, prev_states)) state = {} @@ -245,6 +302,15 @@ class StateHandler(object): new_state = unconflicted_state new_state.update(resolved_state) + if self._state_cache is not None: + cache = _StateCacheEntry( + state=new_state, + state_group=None, + ts=self.clock.time_msec() + ) + + self._state_cache[frozenset(event_ids)] = cache + defer.returnValue((None, new_state, prev_states)) @log_function @@ -328,3 +394,34 @@ class StateHandler(object): return -int(e.depth), hashlib.sha1(e.event_id).hexdigest() return sorted(events, key=key_func) + + def _prune_cache(self): + logger.debug( + "_prune_cache. before len: %d", + len(self._state_cache.keys()) + ) + + now = self.clock.time_msec() + + if len(self._state_cache.keys()) > SIZE_OF_CACHE: + sorted_entries = sorted( + self._state_cache.items(), + key=lambda k, v: v.ts, + ) + + for k, _ in sorted_entries[SIZE_OF_CACHE:]: + self._state_cache.pop(k) + + keys_to_delete = set() + + for key, cache_entry in self._state_cache.items(): + if now - cache_entry.ts > EVICTION_TIMEOUT_SECONDS*1000: + keys_to_delete.add(key) + + for k in keys_to_delete: + self._state_cache.pop(k) + + logger.debug( + "_prune_cache. after len: %d", + len(self._state_cache.keys()) + ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a33e2298f6..02b1f06854 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,7 +44,6 @@ from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash -import json import logging import os @@ -301,12 +300,16 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) + content = encode_canonical_json( + event.content + ).decode("UTF-8") + vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.get_dict()["content"]), + "content": content, "processed": True, "outlier": outlier, "depth": event.depth, @@ -326,7 +329,10 @@ class DataStore(RoomMemberStore, RoomStore, "prev_events", ] } - vals["unrecognized_keys"] = json.dumps(unrec) + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") try: self._simple_insert_txn( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 29fc334f45..be9934c66f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -24,7 +24,7 @@ from synapse.util.lrucache import LruCache from twisted.internet import defer import collections -import json +import simplejson as json import sys import time diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index e86eeced45..457a11fd02 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -17,7 +17,7 @@ from twisted.internet import defer from ._base import SQLBaseStore -import json +import simplejson as json class FilteringStore(SQLBaseStore): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 620de71398..ae46b39cc1 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -20,7 +20,7 @@ from twisted.internet import defer import logging import copy -import json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c69dd995ce..779f9ce544 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,6 +35,11 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): + def __init__(self, *args, **kw): + super(RoomMemberStore, self).__init__(*args, **kw) + + self._user_rooms_cache = {} + def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ @@ -98,6 +103,8 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, (event.room_id, domain)) + self.invalidate_rooms_for_user(target_user_id) + @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -240,28 +247,53 @@ class RoomMemberStore(SQLBaseStore): results = self._parse_events_txn(txn, rows) return results + # TODO(paul): Create a nice @cached decorator to do this + # @cached + # def get_foo(...) + # ... + # invalidate_foo = get_foo.invalidator + + @defer.inlineCallbacks + def get_rooms_for_user(self, user_id): + # TODO(paul): put some performance counters in here so we can easily + # track what impact this cache is having + if user_id in self._user_rooms_cache: + defer.returnValue(self._user_rooms_cache[user_id]) + + rooms = yield self.get_rooms_for_user_where_membership_is( + user_id, membership_list=[Membership.JOIN], + ) + + # TODO(paul): Consider applying a maximum size; just evict things at + # random, or consider LRU? + + self._user_rooms_cache[user_id] = rooms + defer.returnValue(rooms) + + def invalidate_rooms_for_user(self, user_id): + if user_id in self._user_rooms_cache: + del self._user_rooms_cache[user_id] + + @defer.inlineCallbacks def user_rooms_intersect(self, user_id_list): """ Checks whether all the users whose IDs are given in a list share a room. + + This is a "hot path" function that's called a lot, e.g. by presence for + generating the event stream. As such, it is implemented locally by + wrapping logic around heavily-cached database queries. """ - def interaction(txn): - user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list)) - sql = ( - "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state_events as c " - "ON m.event_id = c.event_id " - "WHERE m.membership = 'join' " - "AND (%(clause)s) " - # TODO(paul): We've got duplicate rows in the database somewhere - # so we have to DISTINCT m.user_id here - "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?" - ) % {"clause": user_list_clause} - - args = list(user_id_list) - args.append(len(user_id_list)) + if len(user_id_list) < 2: + defer.returnValue(True) - txn.execute(sql, args) + deferreds = [self.get_rooms_for_user(u) for u in user_id_list] + + results = yield defer.DeferredList(deferreds) + + # A list of sets of strings giving room IDs for each user + room_id_lists = [set([r.room_id for r in result[1]]) for result in results] - return len(txn.fetchall()) > 0 + # There isn't a setintersection(*list_of_sets) + ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0 - return self.runInteraction("user_rooms_intersect", interaction) + defer.returnValue(ret) diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index a13a2015e4..9e10d37aec 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -21,6 +21,9 @@ def freeze(o): if t is dict: return frozendict({k: freeze(v) for k, v in o.items()}) + if t is frozendict: + return o + if t is str or t is unicode: return o @@ -33,10 +36,11 @@ def freeze(o): def unfreeze(o): - if isinstance(o, frozendict) or isinstance(o, dict): + t = type(o) + if t is dict or t is frozendict: return dict({k: unfreeze(v) for k, v in o.items()}) - if isinstance(o, basestring): + if t is str or t is unicode: return o try: |