diff --git a/CHANGES.rst b/CHANGES.rst
index 113838b1c0..8835107594 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,61 +1,78 @@
-Changes in develop
-==================
+Changes in synapse v0.7.0 (2015-02-12)
+======================================
+
+* Add initial implementation of the query auth federation API, allowing
+ servers to agree on whether an event should be allowed or rejected.
+* Persist events we have rejected from federation, fixing the bug where
+ servers would keep requesting the same events.
+* Various federation performance improvements, including:
+
+ - Add in memory caches on queries such as:
+
+ * Computing the state of a room at a point in time, used for
+ authorization on federation requests.
+ * Fetching events from the database.
+ * User's room membership, used for authorizing presence updates.
+
+ - Upgraded JSON library to improve parsing and serialisation speeds.
- * pydenticon support -- adds dep on pydenticon
- * pylru
- * simplejson
+* Add default avatars to new user accounts using pydenticon library.
+* Correctly time out federation requests.
+* Retry federation requests against different servers.
+* Add support for push and push rules.
+* Add alpha versions of proposed new CSv2 APIs, including ``/sync`` API.
Changes in synapse 0.6.1 (2015-01-07)
=====================================
- * Major optimizations to improve performance of initial sync and event sending
- in large rooms (by up to 10x)
- * Media repository now includes a Content-Length header on media downloads.
- * Improve quality of thumbnails by changing resizing algorithm.
+* Major optimizations to improve performance of initial sync and event sending
+ in large rooms (by up to 10x)
+* Media repository now includes a Content-Length header on media downloads.
+* Improve quality of thumbnails by changing resizing algorithm.
Changes in synapse 0.6.0 (2014-12-16)
=====================================
- * Add new API for media upload and download that supports thumbnailing.
- * Replicate media uploads over multiple homeservers so media is always served
- to clients from their local homeserver. This obsoletes the
- --content-addr parameter and confusion over accessing content directly
- from remote homeservers.
- * Implement exponential backoff when retrying federation requests when
- sending to remote homeservers which are offline.
- * Implement typing notifications.
- * Fix bugs where we sent events with invalid signatures due to bugs where
- we incorrectly persisted events.
- * Improve performance of database queries involving retrieving events.
+* Add new API for media upload and download that supports thumbnailing.
+* Replicate media uploads over multiple homeservers so media is always served
+ to clients from their local homeserver. This obsoletes the
+ --content-addr parameter and confusion over accessing content directly
+ from remote homeservers.
+* Implement exponential backoff when retrying federation requests when
+ sending to remote homeservers which are offline.
+* Implement typing notifications.
+* Fix bugs where we sent events with invalid signatures due to bugs where
+ we incorrectly persisted events.
+* Improve performance of database queries involving retrieving events.
Changes in synapse 0.5.4a (2014-12-13)
======================================
- * Fix bug while generating the error message when a file path specified in
- the config doesn't exist.
+* Fix bug while generating the error message when a file path specified in
+ the config doesn't exist.
Changes in synapse 0.5.4 (2014-12-03)
=====================================
- * Fix presence bug where some rooms did not display presence updates for
- remote users.
- * Do not log SQL timing log lines when started with "-v"
- * Fix potential memory leak.
+* Fix presence bug where some rooms did not display presence updates for
+ remote users.
+* Do not log SQL timing log lines when started with "-v"
+* Fix potential memory leak.
Changes in synapse 0.5.3c (2014-12-02)
======================================
- * Change the default value for the `content_addr` option to use the HTTP
- listener, as by default the HTTPS listener will be using a self-signed
- certificate.
+* Change the default value for the `content_addr` option to use the HTTP
+ listener, as by default the HTTPS listener will be using a self-signed
+ certificate.
Changes in synapse 0.5.3 (2014-11-27)
=====================================
- * Fix bug that caused joining a remote room to fail if a single event was not
- signed correctly.
- * Fix bug which caused servers to continuously try and fetch events from other
- servers.
+* Fix bug that caused joining a remote room to fail if a single event was not
+ signed correctly.
+* Fix bug which caused servers to continuously try and fetch events from other
+ servers.
Changes in synapse 0.5.2 (2014-11-26)
=====================================
diff --git a/UPGRADE.rst b/UPGRADE.rst
index 0f81f3e11f..6ea348acc3 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -1,3 +1,17 @@
+Upgrading to v0.7.0
+===================
+
+New dependencies are:
+
+- pydenticon
+- simplejson
+- syutil
+- matrix-angular-sdk
+
+To pull in these dependencies in a virtual env, run::
+
+ python synapse/python_dependencies.py | xargs -n 1 pip install
+
Upgrading to v0.6.0
===================
diff --git a/scripts/federation_client.py b/scripts/federation_client.py
index 3139c61761..ea62dceb36 100644
--- a/scripts/federation_client.py
+++ b/scripts/federation_client.py
@@ -97,8 +97,11 @@ def lookup(destination, path):
if ":" in destination:
return "https://%s%s" % (destination, path)
else:
- srv = srvlookup.lookup("matrix", "tcp", destination)[0]
- return "https://%s:%d%s" % (srv.host, srv.port, path)
+ try:
+ srv = srvlookup.lookup("matrix", "tcp", destination)[0]
+ return "https://%s:%d%s" % (srv.host, srv.port, path)
+ except:
+ return "https://%s:%d%s" % (destination, 8448, path)
def get_json(origin_name, origin_key, destination, path):
request_json = {
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 8fe8df4edb..1060fcc866 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.6.1f"
+__version__ = "0.7.0c"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index ff29d785db..27b478a1c3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -94,7 +94,9 @@ class SynapseHomeServer(HomeServer):
"sqlite3", self.get_db_name(),
check_same_thread=False,
cp_min=1,
- cp_max=1
+ cp_max=1,
+ cp_openfun=prepare_database, # Prepare the database for each conn
+ # so that :memory: sqlite works
)
def create_resource_tree(self, web_client, redirect_root_to_web_client):
@@ -257,14 +259,6 @@ def setup():
logger.info("Database prepared in %s.", db_name)
- db_pool = hs.get_db_pool()
-
- if db_name == ":memory:":
- # Memory databases will need to be setup each time they are opened.
- reactor.callWhenRunning(
- db_pool.runWithConnection, prepare_database
- )
-
if config.manhole:
f = twisted.manhole.telnet.ShellFactory()
f.username = "matrix"
@@ -275,10 +269,11 @@ def setup():
bind_port = config.bind_port
if config.no_tls:
bind_port = None
+
hs.start_listening(bind_port, config.unsecure_port)
hs.get_pusherpool().start()
-
+ hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling()
if config.daemonize:
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index cd12349f67..74008347c3 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -19,7 +19,7 @@ from twisted.internet.protocol import Factory
from twisted.internet import defer, reactor
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.logcontext import PreserveLoggingContext
-import json
+import simplejson as json
import logging
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 8f0c6e959f..64e08223b0 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.frozenutils import freeze, unfreeze
+from synapse.util.frozenutils import freeze
class _EventInternalMetadata(object):
@@ -140,10 +140,6 @@ class FrozenEvent(EventBase):
return e
- def get_dict(self):
- # We need to unfreeze what we return
- return unfreeze(super(FrozenEvent, self).get_dict())
-
def __str__(self):
return self.__repr__()
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a990aec4fd..21a763214b 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -50,8 +50,11 @@ class FederationBase(object):
Returns:
Deferred : A list of PDUs that have valid signatures and hashes.
"""
+
signed_pdus = []
- for pdu in pdus:
+
+ @defer.inlineCallbacks
+ def do(pdu):
try:
new_pdu = yield self._check_sigs_and_hash(pdu)
signed_pdus.append(new_pdu)
@@ -61,25 +64,37 @@ class FederationBase(object):
# Check local db.
new_pdu = yield self.store.get_event(
pdu.event_id,
- allow_rejected=True
+ allow_rejected=True,
+ allow_none=True,
)
if new_pdu:
signed_pdus.append(new_pdu)
- continue
+ return
# Check pdu.origin
if pdu.origin != origin:
- new_pdu = yield self.get_pdu(
- destinations=[pdu.origin],
- event_id=pdu.event_id,
- outlier=outlier,
- )
-
- if new_pdu:
- signed_pdus.append(new_pdu)
- continue
+ try:
+ new_pdu = yield self.get_pdu(
+ destinations=[pdu.origin],
+ event_id=pdu.event_id,
+ outlier=outlier,
+ )
+
+ if new_pdu:
+ signed_pdus.append(new_pdu)
+ return
+ except:
+ pass
+
+ logger.warn(
+ "Failed to find copy of %s with valid signature",
+ pdu.event_id,
+ )
- logger.warn("Failed to find copy of %s with valid signature")
+ yield defer.gatherResults(
+ [do(pdu) for pdu in pdus],
+ consumeErrors=True
+ )
defer.returnValue(signed_pdus)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index b23f72c7fa..9f5c98694c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -411,9 +411,12 @@ class FederationServer(FederationBase):
"_handle_new_pdu getting state for %s",
pdu.room_id
)
- state, auth_chain = yield self.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
+ try:
+ state, auth_chain = yield self.get_state_for_room(
+ origin, pdu.room_id, pdu.event_id,
+ )
+ except:
+ logger.warn("Failed to get state for event: %s", pdu.event_id)
ret = yield self.handler.on_receive_pdu(
origin,
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 85c82a4623..76a9dcd777 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -23,7 +23,8 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
-import json
+from syutil.jsonutil import encode_canonical_json
+
import logging
@@ -70,7 +71,7 @@ class TransactionActions(object):
transaction.transaction_id,
transaction.origin,
code,
- json.dumps(response)
+ encode_canonical_json(response)
)
@defer.inlineCallbacks
@@ -100,5 +101,5 @@ class TransactionActions(object):
transaction.transaction_id,
transaction.destination,
response_code,
- json.dumps(response_dict)
+ encode_canonical_json(response_dict)
)
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index e442c6c5d5..54a0c7ad8e 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer):
self._order = 0
+ self.hs = hs
+
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 9c9f8d525b..2ffb37aa18 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -20,7 +20,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.util.logutils import log_function
import logging
-import json
+import simplejson as json
import re
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0f9c82fd06..d667d358ab 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -23,6 +23,7 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
+from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
)
@@ -311,7 +312,7 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id] = []
builder = self.event_builder_factory.new(
- event.get_pdu_json()
+ unfreeze(event.get_pdu_json())
)
handled_events = set()
@@ -857,6 +858,40 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.debug("Different auth: %s", different_auth)
+ different_events = yield defer.gatherResults(
+ [
+ self.store.get_event(
+ d,
+ allow_none=True,
+ allow_rejected=False,
+ )
+ for d in different_auth
+ if d in have_events and not have_events[d]
+ ],
+ consumeErrors=True
+ )
+
+ if different_events:
+ local_view = dict(auth_events)
+ remote_view = dict(auth_events)
+ remote_view.update({
+ (d.type, d.state_key) for d in different_events
+ })
+
+ new_state, prev_state = self.state.resolve_events(
+ [local_view, remote_view],
+ event
+ )
+
+ auth_events.update(new_state)
+
+ current_state = set(e.event_id for e in auth_events.values())
+ different_auth = event_auth_events - current_state
+
+ context.current_state.update(auth_events)
+ context.state_group = None
+
+ if different_auth and not event.internal_metadata.is_outlier():
# Only do auth resolution if we have something new to say.
# We can't rove an auth failure.
do_resolution = False
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3355adefcf..7b9685be7f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -372,6 +372,7 @@ class MessageHandler(BaseHandler):
room_members = [
m for m in current_state.values()
if m.type == EventTypes.Member
+ and m.content["membership"] == Membership.JOIN
]
presence_handler = self.hs.get_handlers().presence_handler
@@ -384,17 +385,10 @@ class MessageHandler(BaseHandler):
as_event=True,
)
presence.append(member_presence)
- except SynapseError as e:
- if e.code == 404:
- # FIXME: We are doing this as a warn since this gets hit a
- # lot and spams the logs. Why is this happening?
- logger.warn(
- "Failed to get member presence of %r", m.user_id
- )
- else:
- logger.exception(
- "Failed to get member presence of %r", m.user_id
- )
+ except SynapseError:
+ logger.exception(
+ "Failed to get member presence of %r", m.user_id
+ )
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0369b907a5..914742d913 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -462,7 +462,7 @@ class RoomMemberHandler(BaseHandler):
room_hosts,
room_id,
event.user_id,
- event.get_dict()["content"], # FIXME To get a non-frozen dict
+ event.content, # FIXME To get a non-frozen dict
context
)
else:
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 7b23116556..e46e7db146 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -15,6 +15,8 @@
from synapse.api.errors import CodeMessageException
from synapse.http.agent_name import AGENT_NAME
+from syutil.jsonutil import encode_canonical_json
+
from twisted.internet import defer, reactor
from twisted.web.client import (
Agent, readBody, FileBodyProducer, PartialDownloadError
@@ -23,7 +25,7 @@ from twisted.web.http_headers import Headers
from StringIO import StringIO
-import json
+import simplejson as json
import logging
import urllib
@@ -64,7 +66,7 @@ class SimpleHttpClient(object):
@defer.inlineCallbacks
def post_json_get_json(self, uri, post_json):
- json_str = json.dumps(post_json)
+ json_str = encode_canonical_json(post_json)
logger.info("HTTP POST %s -> %s", json_str, uri)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 056d446e42..1927948001 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -33,7 +33,7 @@ from synapse.api.errors import (
from syutil.crypto.jsonsign import sign_json
-import json
+import simplejson as json
import logging
import urllib
import urlparse
@@ -79,6 +79,7 @@ class MatrixFederationHttpClient(object):
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.agent = MatrixFederationHttpAgent(reactor)
+ self.clock = hs.get_clock()
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
@@ -118,7 +119,7 @@ class MatrixFederationHttpClient(object):
try:
with PreserveLoggingContext():
- response = yield self.agent.request(
+ request_deferred = self.agent.request(
destination,
endpoint,
method,
@@ -129,6 +130,11 @@ class MatrixFederationHttpClient(object):
producer
)
+ response = yield self.clock.time_bound_deferred(
+ request_deferred,
+ time_out=60,
+ )
+
logger.debug("Got response to %s", method)
break
except Exception as e:
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 418a348a58..0659a1cb9b 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -22,7 +22,7 @@ import synapse.util.async
import baserules
import logging
-import json
+import simplejson as json
import re
logger = logging.getLogger(__name__)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 5a525befd7..90babd7224 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,8 +19,10 @@ from twisted.internet import defer
from httppusher import HttpPusher
from synapse.push import PusherConfigException
+from syutil.jsonutil import encode_canonical_json
+
import logging
-import json
+import simplejson as json
logger = logging.getLogger(__name__)
@@ -96,7 +98,7 @@ class PusherPool:
pushkey=pushkey,
pushkey_ts=self.hs.get_clock().time_msec(),
lang=lang,
- data=json.dumps(data)
+ data=encode_canonical_json(data).decode("UTF-8"),
)
self._refresh_pusher((app_id, pushkey))
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index e2a9d1f6a7..ec78fc3627 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -4,8 +4,8 @@ from distutils.version import LooseVersion
logger = logging.getLogger(__name__)
REQUIREMENTS = {
- "syutil==0.0.2": ["syutil"],
- "matrix_angular_sdk>=0.6.1": ["syweb>=0.6.1"],
+ "syutil>=0.0.3": ["syutil"],
+ "matrix_angular_sdk>=0.6.2": ["syweb>=0.6.2"],
"Twisted==14.0.2": ["twisted==14.0.2"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
@@ -26,13 +26,13 @@ def github_link(project, version, egg):
DEPENDENCY_LINKS = [
github_link(
project="matrix-org/syutil",
- version="v0.0.2",
- egg="syutil-0.0.2",
+ version="v0.0.3",
+ egg="syutil-0.0.3",
),
github_link(
project="matrix-org/matrix-angular-sdk",
- version="v0.6.1",
- egg="matrix_angular_sdk-0.6.1",
+ version="v0.6.2",
+ egg="matrix_angular_sdk-0.6.2",
),
github_link(
project="pyca/pynacl",
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index f126798597..6758a888b3 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -20,7 +20,7 @@ from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import RoomAlias
from .base import ClientV1RestServlet, client_path_pattern
-import json
+import simplejson as json
import logging
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 7116ac98e8..b2257b749d 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError
from synapse.types import UserID
from base import ClientV1RestServlet, client_path_pattern
-import json
+import simplejson as json
class LoginRestServlet(ClientV1RestServlet):
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index 7feb4aadb1..78d4f2b128 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -21,7 +21,7 @@ from synapse.api.errors import SynapseError
from synapse.types import UserID
from .base import ClientV1RestServlet, client_path_pattern
-import json
+import simplejson as json
import logging
logger = logging.getLogger(__name__)
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index 15d6f3fc6c..1e77eb49cf 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -19,7 +19,7 @@ from twisted.internet import defer
from .base import ClientV1RestServlet, client_path_pattern
from synapse.types import UserID
-import json
+import simplejson as json
class ProfileDisplaynameRestServlet(ClientV1RestServlet):
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index c4e7dfcf0e..b012f31084 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -27,7 +27,7 @@ from synapse.push.rulekinds import (
PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
)
-import json
+import simplejson as json
class PushRuleRestServlet(ClientV1RestServlet):
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 80e9939b79..6045e86f34 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, Codes
from synapse.push import PusherConfigException
from .base import ClientV1RestServlet, client_path_pattern
-import json
+import simplejson as json
class PusherRestServlet(ClientV1RestServlet):
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 1ab32b53ea..8d2115082b 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -25,7 +25,7 @@ from synapse.util.async import run_on_reactor
from hashlib import sha1
import hmac
-import json
+import simplejson as json
import logging
import urllib
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 410f19ccf6..0346afb1b4 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.types import UserID, RoomID, RoomAlias
from synapse.events.utils import serialize_event
-import json
+import simplejson as json
import logging
import urllib
diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py
index 6ddc495d23..703250cea8 100644
--- a/synapse/rest/client/v2_alpha/filter.py
+++ b/synapse/rest/client/v2_alpha/filter.py
@@ -21,7 +21,7 @@ from synapse.types import UserID
from ._base import client_v2_pattern
-import json
+import simplejson as json
import logging
diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py
index 22e26e3cd5..e77a20fb2e 100644
--- a/synapse/rest/media/v0/content_repository.py
+++ b/synapse/rest/media/v0/content_repository.py
@@ -25,7 +25,7 @@ from twisted.web import server, resource
from twisted.internet import defer
import base64
-import json
+import simplejson as json
import logging
import os
import re
diff --git a/synapse/state.py b/synapse/state.py
index 54380b9e5c..fe5f3dc84b 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -43,14 +43,39 @@ AuthEventTypes = (
)
+SIZE_OF_CACHE = 1000
+EVICTION_TIMEOUT_SECONDS = 20
+
+
+class _StateCacheEntry(object):
+ def __init__(self, state, state_group, ts):
+ self.state = state
+ self.state_group = state_group
+ self.ts = ts
+
+
class StateHandler(object):
""" Responsible for doing state conflict resolution.
"""
def __init__(self, hs):
+ self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.hs = hs
+ # dict of set of event_ids -> _StateCacheEntry.
+ self._state_cache = None
+
+ def start_caching(self):
+ logger.debug("start_caching")
+
+ self._state_cache = {}
+
+ def f():
+ self._prune_cache()
+
+ self.clock.looping_call(f, 5*1000)
+
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
""" Returns the current state for the room as a list. This is done by
@@ -70,13 +95,22 @@ class StateHandler(object):
for e_id, _, _ in events
]
- res = yield self.resolve_state_groups(event_ids)
+ cache = None
+ if self._state_cache is not None:
+ cache = self._state_cache.get(frozenset(event_ids), None)
+
+ if cache:
+ cache.ts = self.clock.time_msec()
+ state = cache.state
+ else:
+ res = yield self.resolve_state_groups(event_ids)
+ state = res[1]
if event_type:
- defer.returnValue(res[1].get((event_type, state_key)))
+ defer.returnValue(state.get((event_type, state_key)))
return
- defer.returnValue(res[1])
+ defer.returnValue(state)
@defer.inlineCallbacks
def compute_event_context(self, event, old_state=None):
@@ -177,6 +211,20 @@ class StateHandler(object):
"""
logger.debug("resolve_state_groups event_ids %s", event_ids)
+ if self._state_cache is not None:
+ cache = self._state_cache.get(frozenset(event_ids), None)
+ if cache and cache.state_group:
+ cache.ts = self.clock.time_msec()
+ prev_state = cache.state.get((event_type, state_key), None)
+ if prev_state:
+ prev_state = prev_state.event_id
+ prev_states = [prev_state]
+ else:
+ prev_states = []
+ defer.returnValue(
+ (cache.state_group, cache.state, prev_states)
+ )
+
state_groups = yield self.store.get_state_groups(
event_ids
)
@@ -200,15 +248,48 @@ class StateHandler(object):
else:
prev_states = []
+ if self._state_cache is not None:
+ cache = _StateCacheEntry(
+ state=state,
+ state_group=name,
+ ts=self.clock.time_msec()
+ )
+
+ self._state_cache[frozenset(event_ids)] = cache
+
defer.returnValue((name, state, prev_states))
+ new_state, prev_states = self._resolve_events(
+ state_groups.values(), event_type, state_key
+ )
+
+ if self._state_cache is not None:
+ cache = _StateCacheEntry(
+ state=new_state,
+ state_group=None,
+ ts=self.clock.time_msec()
+ )
+
+ self._state_cache[frozenset(event_ids)] = cache
+
+ defer.returnValue((None, new_state, prev_states))
+
+ def resolve_events(self, state_sets, event):
+ if event.is_state():
+ return self._resolve_events(
+ state_sets, event.type, event.state_key
+ )
+ else:
+ return self._resolve_events(state_sets)
+
+ def _resolve_events(self, state_sets, event_type=None, state_key=""):
state = {}
- for group, g_state in state_groups.items():
- for s in g_state:
+ for st in state_sets:
+ for e in st:
state.setdefault(
- (s.type, s.state_key),
+ (e.type, e.state_key),
{}
- )[s.event_id] = s
+ )[e.event_id] = e
unconflicted_state = {
k: v.values()[0] for k, v in state.items()
@@ -245,7 +326,7 @@ class StateHandler(object):
new_state = unconflicted_state
new_state.update(resolved_state)
- defer.returnValue((None, new_state, prev_states))
+ return new_state, prev_states
@log_function
def _resolve_state_events(self, conflicted_state, auth_events):
@@ -328,3 +409,34 @@ class StateHandler(object):
return -int(e.depth), hashlib.sha1(e.event_id).hexdigest()
return sorted(events, key=key_func)
+
+ def _prune_cache(self):
+ logger.debug(
+ "_prune_cache. before len: %d",
+ len(self._state_cache.keys())
+ )
+
+ now = self.clock.time_msec()
+
+ if len(self._state_cache.keys()) > SIZE_OF_CACHE:
+ sorted_entries = sorted(
+ self._state_cache.items(),
+ key=lambda k, v: v.ts,
+ )
+
+ for k, _ in sorted_entries[SIZE_OF_CACHE:]:
+ self._state_cache.pop(k)
+
+ keys_to_delete = set()
+
+ for key, cache_entry in self._state_cache.items():
+ if now - cache_entry.ts > EVICTION_TIMEOUT_SECONDS*1000:
+ keys_to_delete.add(key)
+
+ for k in keys_to_delete:
+ self._state_cache.pop(k)
+
+ logger.debug(
+ "_prune_cache. after len: %d",
+ len(self._state_cache.keys())
+ )
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4b618e0c65..c6e96b842f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -45,7 +45,6 @@ from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
-import json
import logging
import os
@@ -304,12 +303,16 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
+ content = encode_canonical_json(
+ event.content
+ ).decode("UTF-8")
+
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
- "content": json.dumps(event.get_dict()["content"]),
+ "content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
@@ -329,7 +332,10 @@ class DataStore(RoomMemberStore, RoomStore,
"prev_events",
]
}
- vals["unrecognized_keys"] = json.dumps(unrec)
+
+ vals["unrecognized_keys"] = encode_canonical_json(
+ unrec
+ ).decode("UTF-8")
try:
self._simple_insert_txn(
@@ -634,10 +640,13 @@ def prepare_database(db_conn):
c.executescript(sql_script)
db_conn.commit()
+ else:
+ logger.info("Database is at version %r", user_version)
else:
sql_script = "BEGIN TRANSACTION;\n"
for sql_loc in SCHEMAS:
+ logger.debug("Applying schema %r", sql_loc)
sql_script += read_schema(sql_loc)
sql_script += "\n"
sql_script += "COMMIT TRANSACTION;"
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 29fc334f45..be9934c66f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -24,7 +24,7 @@ from synapse.util.lrucache import LruCache
from twisted.internet import defer
import collections
-import json
+import simplejson as json
import sys
import time
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0cbcdd1b55..3fbc090224 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -55,17 +55,16 @@ class EventFederationStore(SQLBaseStore):
results = set()
base_sql = (
- "SELECT auth_id FROM event_auth WHERE %s"
+ "SELECT auth_id FROM event_auth WHERE event_id = ?"
)
front = set(event_ids)
while front:
- sql = base_sql % (
- " OR ".join(["event_id=?"] * len(front)),
- )
-
- txn.execute(sql, list(front))
- front = [r[0] for r in txn.fetchall()]
+ new_front = set()
+ for f in front:
+ txn.execute(base_sql, (f,))
+ new_front.update([r[0] for r in txn.fetchall()])
+ front = new_front
results.update(front)
return list(results)
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index e86eeced45..457a11fd02 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore
-import json
+import simplejson as json
class FilteringStore(SQLBaseStore):
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 620de71398..ae46b39cc1 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -20,7 +20,7 @@ from twisted.internet import defer
import logging
import copy
-import json
+import simplejson as json
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 6542f8e4f8..750b17a45f 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -82,38 +82,45 @@ class RoomStore(SQLBaseStore):
"topic" key if one is set, and a "name" key if one is set
"""
- topic_subquery = (
- "SELECT topics.event_id as event_id, "
- "topics.room_id as room_id, topic "
- "FROM topics "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = topics.event_id "
- )
+ def f(txn):
+ topic_subquery = (
+ "SELECT topics.event_id as event_id, "
+ "topics.room_id as room_id, topic "
+ "FROM topics "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = topics.event_id "
+ )
- name_subquery = (
- "SELECT room_names.event_id as event_id, "
- "room_names.room_id as room_id, name "
- "FROM room_names "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = room_names.event_id "
- )
+ name_subquery = (
+ "SELECT room_names.event_id as event_id, "
+ "room_names.room_id as room_id, name "
+ "FROM room_names "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = room_names.event_id "
+ )
- # We use non printing ascii character US () as a seperator
- sql = (
- "SELECT r.room_id, n.name, t.topic, "
- "group_concat(a.room_alias, '') "
- "FROM rooms AS r "
- "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
- "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
- "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
- "WHERE r.is_public = ? "
- "GROUP BY r.room_id "
- ) % {
- "topic": topic_subquery,
- "name": name_subquery,
- }
-
- rows = yield self._execute(None, sql, is_public)
+ # We use non printing ascii character US () as a seperator
+ sql = (
+ "SELECT r.room_id, n.name, t.topic, "
+ "group_concat(a.room_alias, '') "
+ "FROM rooms AS r "
+ "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+ "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+ "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+ "WHERE r.is_public = ? "
+ "GROUP BY r.room_id "
+ ) % {
+ "topic": topic_subquery,
+ "name": name_subquery,
+ }
+
+ c = txn.execute(sql, (is_public,))
+
+ return c.fetchall()
+
+ rows = yield self.runInteraction(
+ "get_rooms", f
+ )
ret = [
{
diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql
index 302d958dbf..b87ef1fe79 100644
--- a/synapse/storage/schema/delta/v12.sql
+++ b/synapse/storage/schema/delta/v12.sql
@@ -63,3 +63,5 @@ CREATE TABLE IF NOT EXISTS user_filters(
CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
user_id, filter_id
);
+
+PRAGMA user_version = 12;
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index fee76b0a9b..e77eba90ad 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -15,9 +15,12 @@
from synapse.util.logcontext import LoggingContext
-from twisted.internet import reactor, task
+from twisted.internet import defer, reactor, task
import time
+import logging
+
+logger = logging.getLogger(__name__)
class Clock(object):
@@ -53,3 +56,53 @@ class Clock(object):
def cancel_call_later(self, timer):
timer.cancel()
+
+ def time_bound_deferred(self, given_deferred, time_out):
+ if given_deferred.called:
+ return given_deferred
+
+ ret_deferred = defer.Deferred()
+
+ def timed_out_fn():
+ try:
+ ret_deferred.errback(RuntimeError("Timed out"))
+ except:
+ pass
+
+ try:
+ given_deferred.cancel()
+ except:
+ pass
+
+ timer = None
+
+ def cancel(res):
+ try:
+ self.cancel_call_later(timer)
+ except:
+ pass
+ return res
+
+ ret_deferred.addBoth(cancel)
+
+ def sucess(res):
+ try:
+ ret_deferred.callback(res)
+ except:
+ pass
+
+ return res
+
+ def err(res):
+ try:
+ ret_deferred.errback(res)
+ except:
+ pass
+
+ return res
+
+ given_deferred.addCallbacks(callback=sucess, errback=err)
+
+ timer = self.call_later(time_out, timed_out_fn)
+
+ return ret_deferred
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index a13a2015e4..9e10d37aec 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -21,6 +21,9 @@ def freeze(o):
if t is dict:
return frozendict({k: freeze(v) for k, v in o.items()})
+ if t is frozendict:
+ return o
+
if t is str or t is unicode:
return o
@@ -33,10 +36,11 @@ def freeze(o):
def unfreeze(o):
- if isinstance(o, frozendict) or isinstance(o, dict):
+ t = type(o)
+ if t is dict or t is frozendict:
return dict({k: unfreeze(v) for k, v in o.items()})
- if isinstance(o, basestring):
+ if t is str or t is unicode:
return o
try:
diff --git a/tests/test_state.py b/tests/test_state.py
index 019e794aa2..fea25f7021 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -21,6 +21,8 @@ from synapse.api.auth import Auth
from synapse.api.constants import EventTypes, Membership
from synapse.state import StateHandler
+from .utils import MockClock
+
from mock import Mock
@@ -138,10 +140,13 @@ class StateTestCase(unittest.TestCase):
"add_event_hashes",
]
)
- hs = Mock(spec=["get_datastore", "get_auth", "get_state_handler"])
+ hs = Mock(spec=[
+ "get_datastore", "get_auth", "get_state_handler", "get_clock",
+ ])
hs.get_datastore.return_value = self.store
hs.get_state_handler.return_value = None
hs.get_auth.return_value = Auth(hs)
+ hs.get_clock.return_value = MockClock()
self.state = StateHandler(hs)
self.event_id = 0
|