diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/api/events/factory.py | 4 | ||||
-rw-r--r-- | synapse/config/server.py | 2 | ||||
-rw-r--r-- | synapse/crypto/context_factory.py | 5 | ||||
-rw-r--r-- | synapse/crypto/keyring.py | 1 | ||||
-rw-r--r-- | synapse/federation/pdu_codec.py | 4 | ||||
-rw-r--r-- | synapse/federation/persistence.py | 2 | ||||
-rw-r--r-- | synapse/federation/replication.py | 6 | ||||
-rw-r--r-- | synapse/federation/transport.py | 19 | ||||
-rw-r--r-- | synapse/federation/units.py | 25 | ||||
-rw-r--r-- | synapse/handlers/message.py | 16 | ||||
-rw-r--r-- | synapse/handlers/register.py | 3 | ||||
-rw-r--r-- | synapse/http/client.py | 44 | ||||
-rw-r--r-- | synapse/rest/presence.py | 4 | ||||
-rw-r--r-- | synapse/rest/voip.py | 2 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 11 | ||||
-rw-r--r-- | synapse/storage/_base.py | 3 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v6.sql | 31 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 13 |
19 files changed, 138 insertions, 59 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index a340a5db66..7067188c5b 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a synapse home server. """ -__version__ = "0.3.4" +__version__ = "0.4.1" diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index c6d1151cac..06f3bf232b 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -66,8 +66,8 @@ class EventFactory(object): if "event_id" not in kwargs: kwargs["event_id"] = self.create_event_id() - if "ts" not in kwargs: - kwargs["ts"] = int(self.clock.time_msec()) + if "origin_server_ts" not in kwargs: + kwargs["origin_server_ts"] = int(self.clock.time_msec()) # The "age" key is a delta timestamp that should be converted into an # absolute timestamp the minute we see it. diff --git a/synapse/config/server.py b/synapse/config/server.py index d9d8d0e14e..9332e4acd7 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -94,7 +94,7 @@ class ServerConfig(Config): with open(args.signing_key_path, "w") as signing_key_file: syutil.crypto.signing_key.write_signing_keys( signing_key_file, - (syutil.crypto.SigningKey.generate("auto"),), + (syutil.crypto.signing_key.generate_singing_key("auto"),), ) else: signing_keys = cls.read_file(args.signing_key_path, "signing_key") diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index f86bd19255..f402c795bb 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -16,6 +16,9 @@ from twisted.internet import ssl from OpenSSL import SSL from twisted.internet._sslverify import _OpenSSLECCurve, _defaultCurveName +import logging + +logger = logging.getLogger(__name__) class ServerContextFactory(ssl.ContextFactory): """Factory for PyOpenSSL SSL contexts that are used to handle incoming @@ -31,7 +34,7 @@ class ServerContextFactory(ssl.ContextFactory): _ecCurve = _OpenSSLECCurve(_defaultCurveName) _ecCurve.addECKeyToContext(context) except: - pass + logger.exception("Failed to enable eliptic curve for TLS") context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) context.use_certificate(config.tls_certificate) context.use_privatekey(config.tls_private_key) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 015f76ebe3..2440d604c3 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -38,6 +38,7 @@ class Keyring(object): @defer.inlineCallbacks def verify_json_for_server(self, server_name, json_object): + logger.debug("Verifying for %s", server_name) key_ids = signature_ids(json_object, server_name) if not key_ids: raise SynapseError( diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index cef61108dd..e8180d94fd 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -96,7 +96,7 @@ class PduCodec(object): if k not in ["event_id", "room_id", "type", "prev_events"] }) - if "ts" not in kwargs: - kwargs["ts"] = int(self.clock.time_msec()) + if "origin_server_ts" not in kwargs: + kwargs["origin_server_ts"] = int(self.clock.time_msec()) return Pdu(**kwargs) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index de36a80e41..7043fcc504 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -157,7 +157,7 @@ class TransactionActions(object): transaction.prev_ids = yield self.store.prep_send_transaction( transaction.transaction_id, transaction.destination, - transaction.ts, + transaction.origin_server_ts, [(p["pdu_id"], p["origin"]) for p in transaction.pdus] ) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 8c7d510ef6..d901837d0a 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -321,7 +321,7 @@ class ReplicationLayer(object): if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu(edu.origin, edu.edu_type, edu.content) + self.received_edu(transaction.origin, edu.edu_type, edu.content) results = yield defer.DeferredList(dl) @@ -474,7 +474,7 @@ class ReplicationLayer(object): return Transaction( origin=self.server_name, pdus=pdus, - ts=int(self._clock.time_msec()), + origin_server_ts=int(self._clock.time_msec()), destination=None, ) @@ -654,7 +654,7 @@ class _TransactionQueue(object): logger.debug("TX [%s] Persisting transaction...", destination) transaction = Transaction.create_new( - ts=self._clock.time_msec(), + origin_server_ts=self._clock.time_msec(), transaction_id=str(self._next_txn_id), origin=self.server_name, destination=destination, diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index de64702e2f..7f01b4faaf 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -301,6 +301,11 @@ class TransportLayer(object): auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") + if not auth_headers: + raise SynapseError( + 401, "Missing Authorization headers", Codes.UNAUTHORIZED, + ) + for auth in auth_headers: if auth.startswith("X-Matrix"): (origin, key, sig) = parse_auth_header(auth) @@ -319,13 +324,13 @@ class TransportLayer(object): def _with_authentication(self, handler): @defer.inlineCallbacks def new_handler(request, *args, **kwargs): - (origin, content) = yield self._authenticate_request(request) try: + (origin, content) = yield self._authenticate_request(request) response = yield handler( origin, content, request.args, *args, **kwargs ) except: - logger.exception("Callback failed") + logger.exception("_authenticate_request failed") raise defer.returnValue(response) return new_handler @@ -496,9 +501,13 @@ class TransportLayer(object): defer.returnValue((400, {"error": "Invalid transaction"})) return - code, response = yield self.received_handler.on_incoming_transaction( - transaction_data - ) + try: + code, response = yield self.received_handler.on_incoming_transaction( + transaction_data + ) + except: + logger.exception("on_incoming_transaction failed") + raise defer.returnValue((code, response)) diff --git a/synapse/federation/units.py b/synapse/federation/units.py index ecca35ac43..b2fb964180 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -40,7 +40,7 @@ class Pdu(JsonEncodedObject): { "pdu_id": "78c", - "ts": 1404835423000, + "origin_server_ts": 1404835423000, "origin": "bar", "prev_ids": [ ["23b", "foo"], @@ -55,7 +55,7 @@ class Pdu(JsonEncodedObject): "pdu_id", "context", "origin", - "ts", + "origin_server_ts", "pdu_type", "destinations", "transaction_id", @@ -82,7 +82,7 @@ class Pdu(JsonEncodedObject): "pdu_id", "context", "origin", - "ts", + "origin_server_ts", "pdu_type", "content", ] @@ -118,6 +118,7 @@ class Pdu(JsonEncodedObject): """ if pdu_tuple: d = copy.copy(pdu_tuple.pdu_entry._asdict()) + d["origin_server_ts"] = d.pop("ts") d["content"] = json.loads(d["content_json"]) del d["content_json"] @@ -156,11 +157,15 @@ class Edu(JsonEncodedObject): ] required_keys = [ - "origin", - "destination", "edu_type", ] +# TODO: SYN-103: Remove "origin" and "destination" keys. +# internal_keys = [ +# "origin", +# "destination", +# ] + class Transaction(JsonEncodedObject): """ A transaction is a list of Pdus and Edus to be sent to a remote home @@ -182,7 +187,7 @@ class Transaction(JsonEncodedObject): "transaction_id", "origin", "destination", - "ts", + "origin_server_ts", "previous_ids", "pdus", "edus", @@ -199,7 +204,7 @@ class Transaction(JsonEncodedObject): "transaction_id", "origin", "destination", - "ts", + "origin_server_ts", "pdus", ] @@ -221,10 +226,10 @@ class Transaction(JsonEncodedObject): @staticmethod def create_new(pdus, **kwargs): """ Used to create a new transaction. Will auto fill out - transaction_id and ts keys. + transaction_id and origin_server_ts keys. """ - if "ts" not in kwargs: - raise KeyError("Require 'ts' to construct a Transaction") + if "origin_server_ts" not in kwargs: + raise KeyError("Require 'origin_server_ts' to construct a Transaction") if "transaction_id" not in kwargs: raise KeyError( "Require 'transaction_id' to construct a Transaction" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4aaf97a83e..65861033e9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -64,7 +64,7 @@ class MessageHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def send_message(self, event=None, suppress_auth=False, stamp_event=True): + def send_message(self, event=None, suppress_auth=False): """ Send a message. Args: @@ -72,7 +72,6 @@ class MessageHandler(BaseHandler): suppress_auth (bool) : True to suppress auth for this message. This is primarily so the home server can inject messages into rooms at will. - stamp_event (bool) : True to stamp event content with server keys. Raises: SynapseError if something went wrong. """ @@ -82,9 +81,6 @@ class MessageHandler(BaseHandler): user = self.hs.parse_userid(event.user_id) assert user.is_mine, "User must be our own: %s" % (user,) - if stamp_event: - event.content["hsob_ts"] = int(self.clock.time_msec()) - snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) yield self._on_new_room_event( @@ -131,7 +127,7 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def store_room_data(self, event=None, stamp_event=True): + def store_room_data(self, event=None): """ Stores data for a room. Args: @@ -148,9 +144,6 @@ class MessageHandler(BaseHandler): state_key=event.state_key, ) - if stamp_event: - event.content["hsob_ts"] = int(self.clock.time_msec()) - yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks @@ -216,10 +209,7 @@ class MessageHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def send_feedback(self, event, stamp_event=True): - if stamp_event: - event.content["hsob_ts"] = int(self.clock.time_msec()) - + def send_feedback(self, event): snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) # store message in db diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index df562aa762..94b7890b5e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -15,6 +15,7 @@ """Contains functions for registering clients.""" from twisted.internet import defer +from twisted.python import log from synapse.types import UserID from synapse.api.errors import ( @@ -126,7 +127,7 @@ class RegistrationHandler(BaseHandler): try: threepid = yield self._threepid_from_creds(c) except: - logger.err() + log.err() raise RegistrationError(400, "Couldn't validate 3pid") if not threepid: diff --git a/synapse/http/client.py b/synapse/http/client.py index 9f54b74e3a..46c90dbb76 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -101,7 +101,9 @@ class BaseHttpClient(object): while True: - producer = body_callback(method, url_bytes, headers_dict) + producer = None + if body_callback: + producer = body_callback(method, url_bytes, headers_dict) try: response = yield self.agent.request( @@ -177,10 +179,6 @@ class MatrixHttpClient(BaseHttpClient): request = sign_json(request, self.server_name, self.signing_key) - from syutil.jsonutil import encode_canonical_json - logger.debug("Signing " + " " * 11 + "%s %s", - self.server_name, encode_canonical_json(request)) - auth_headers = [] for key,sig in request["signatures"][self.server_name].items(): @@ -316,6 +314,42 @@ class IdentityServerHttpClient(BaseHttpClient): defer.returnValue(json.loads(body)) + @defer.inlineCallbacks + def get_json(self, destination, path, args={}, retry_on_dns_fail=True): + """ Get's some json from the given host homeserver and path + + Args: + destination (str): The remote server to send the HTTP request + to. + path (str): The HTTP path. + args (dict): A dictionary used to create query strings, defaults to + None. + **Note**: The value of each key is assumed to be an iterable + and *not* a string. + + Returns: + Deferred: Succeeds when we get *any* HTTP response. + + The result of the deferred is a tuple of `(code, response)`, + where `response` is a dict representing the decoded JSON body. + """ + logger.debug("get_json args: %s", args) + + query_bytes = urllib.urlencode(args, True) + logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) + + response = yield self._create_request( + destination.encode("ascii"), + "GET", + path.encode("ascii"), + query_bytes=query_bytes, + retry_on_dns_fail=retry_on_dns_fail, + body_callback=None + ) + + body = yield readBody(response) + + defer.returnValue(json.loads(body)) class CaptchaServerHttpClient(MatrixHttpClient): """Separate HTTP client for talking to google's captcha servers""" diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py index 7fc8ce4404..138cc88a05 100644 --- a/synapse/rest/presence.py +++ b/synapse/rest/presence.py @@ -68,7 +68,7 @@ class PresenceStatusRestServlet(RestServlet): yield self.handlers.presence_handler.set_state( target_user=user, auth_user=auth_user, state=state) - defer.returnValue((200, "")) + defer.returnValue((200, {})) def on_OPTIONS(self, request): return (200, {}) @@ -141,7 +141,7 @@ class PresenceListRestServlet(RestServlet): yield defer.DeferredList(deferreds) - defer.returnValue((200, "")) + defer.returnValue((200, {})) def on_OPTIONS(self, request): return (200, {}) diff --git a/synapse/rest/voip.py b/synapse/rest/voip.py index 2e4627606f..0d0243a249 100644 --- a/synapse/rest/voip.py +++ b/synapse/rest/voip.py @@ -36,7 +36,7 @@ class VoipRestServlet(RestServlet): if not turnUris or not turnSecret or not userLifetime: defer.returnValue( (200, {}) ) - expiry = self.hs.get_clock().time_msec() + userLifetime + expiry = (self.hs.get_clock().time_msec() + userLifetime) / 1000 username = "%d:%s" % (expiry, auth_user.to_string()) mac = hmac.new(turnSecret, msg=username, digestmod=hashlib.sha1) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 10456688ef..b848630c0b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -66,7 +66,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 5 +SCHEMA_VERSION = 6 class _RollbackButIsFineException(Exception): @@ -157,6 +157,8 @@ class DataStore(RoomMemberStore, RoomStore, cols["unrecognized_keys"] = json.dumps(unrec_keys) + cols["ts"] = cols.pop("origin_server_ts") + logger.debug("Persisting: %s", repr(cols)) if pdu.is_state: @@ -454,10 +456,11 @@ def prepare_database(db_conn): db_conn.commit() else: + sql_script = "BEGIN TRANSACTION;" for sql_loc in SCHEMAS: - sql_script = read_schema(sql_loc) - - c.executescript(sql_script) + sql_script += read_schema(sql_loc) + sql_script += "COMMIT TRANSACTION;" + c.executescript(sql_script) db_conn.commit() c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index dba50f1213..65a86e9056 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -354,6 +354,7 @@ class SQLBaseStore(object): d.pop("stream_ordering", None) d.pop("topological_ordering", None) d.pop("processed", None) + d["origin_server_ts"] = d.pop("ts", 0) d.update(json.loads(row_dict["unrecognized_keys"])) d["content"] = json.loads(d["content"]) @@ -361,7 +362,7 @@ class SQLBaseStore(object): if "age_ts" not in d: # For compatibility - d["age_ts"] = d["ts"] if "ts" in d else 0 + d["age_ts"] = d.get("origin_server_ts", 0) return self.event_factory.create_event( etype=d["type"], diff --git a/synapse/storage/schema/delta/v6.sql b/synapse/storage/schema/delta/v6.sql new file mode 100644 index 0000000000..9bf2068d84 --- /dev/null +++ b/synapse/storage/schema/delta/v6.sql @@ -0,0 +1,31 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS server_tls_certificates( + server_name TEXT, -- Server name. + fingerprint TEXT, -- Certificate fingerprint. + from_server TEXT, -- Which key server the certificate was fetched from. + ts_added_ms INTEGER, -- When the certifcate was added. + tls_certificate BLOB, -- DER encoded x509 certificate. + CONSTRAINT uniqueness UNIQUE (server_name, fingerprint) +); + +CREATE TABLE IF NOT EXISTS server_signature_keys( + server_name TEXT, -- Server name. + key_id TEXT, -- Key version. + from_server TEXT, -- Which key server the key was fetched form. + ts_added_ms INTEGER, -- When the key was added. + verify_key BLOB, -- NACL verification key. + CONSTRAINT uniqueness UNIQUE (server_name, key_id) +); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ab4599b468..2ba8e30efe 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -87,7 +87,8 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (code, response_json, transaction_id, origin)) - def prep_send_transaction(self, transaction_id, destination, ts, pdu_list): + def prep_send_transaction(self, transaction_id, destination, + origin_server_ts, pdu_list): """Persists an outgoing transaction and calculates the values for the previous transaction id list. @@ -97,7 +98,7 @@ class TransactionStore(SQLBaseStore): Args: transaction_id (str) destination (str) - ts (int) + origin_server_ts (int) pdu_list (list) Returns: @@ -106,11 +107,11 @@ class TransactionStore(SQLBaseStore): return self.runInteraction( self._prep_send_transaction, - transaction_id, destination, ts, pdu_list + transaction_id, destination, origin_server_ts, pdu_list ) - def _prep_send_transaction(self, txn, transaction_id, destination, ts, - pdu_list): + def _prep_send_transaction(self, txn, transaction_id, destination, + origin_server_ts, pdu_list): # First we find out what the prev_txs should be. # Since we know that we are only sending one transaction at a time, @@ -131,7 +132,7 @@ class TransactionStore(SQLBaseStore): None, transaction_id=transaction_id, destination=destination, - ts=ts, + ts=origin_server_ts, response_code=0, response_json=None )) |