diff options
Diffstat (limited to '')
30 files changed, 617 insertions, 497 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 06167e3c1a..895a0766d2 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" This is a reference implementation of a synapse home server. +""" This is a reference implementation of a Matrix home server. """ -__version__ = "0.6.0" +__version__ = "0.6.1b" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e99153988f..9b23c58abe 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,6 +18,8 @@ from synapse.storage import prepare_database, UpgradeDatabaseException from synapse.server import HomeServer +from synapse.python_dependencies import check_requirements + from twisted.internet import reactor from twisted.application import service from twisted.enterprise import adbapi @@ -40,6 +42,8 @@ from synapse.util.logcontext import LoggingContext from daemonize import Daemonize import twisted.manhole.telnet +import synapse + import logging import os import re @@ -199,7 +203,10 @@ def setup(config_options, should_run=True): config.setup_logging() + check_requirements() + logger.info("Server hostname: %s", config.server_name) + logger.info("Server version: %s", synapse.__version__) if re.search(":[0-9]+$", config.server_name): domain_with_port = config.server_name @@ -235,13 +242,20 @@ def setup(config_options, should_run=True): except UpgradeDatabaseException: sys.stderr.write( "\nFailed to upgrade database.\n" - "Have you checked for version specific instructions in UPGRADES.rst?\n" + "Have you checked for version specific instructions in" + " UPGRADES.rst?\n" ) sys.exit(1) logger.info("Database prepared in %s.", db_name) - hs.get_db_pool() + db_pool = hs.get_db_pool() + + if db_name == ":memory:": + # Memory databases will need to be setup each time they are opened. + reactor.callWhenRunning( + db_pool.runWithConnection, prepare_database + ) if config.manhole: f = twisted.manhole.telnet.ShellFactory() @@ -292,6 +306,7 @@ def run(): def main(): with LoggingContext("main"): + check_requirements() setup(sys.argv[1:]) diff --git a/synapse/config/database.py b/synapse/config/database.py index 0d33583a7d..daa161c952 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -20,7 +20,10 @@ import os class DatabaseConfig(Config): def __init__(self, args): super(DatabaseConfig, self).__init__(args) - self.database_path = self.abspath(args.database_path) + if args.database_path == ":memory:": + self.database_path = ":memory:" + else: + self.database_path = self.abspath(args.database_path) @classmethod def add_arguments(cls, parser): diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 15383b3184..f9568ebd21 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -66,7 +66,10 @@ class LoggingConfig(Config): formatter = logging.Formatter(log_format) if self.log_file: - handler = logging.FileHandler(self.log_file) + # TODO: Customisable file size / backup count + handler = logging.handlers.RotatingFileHandler( + self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3 + ) else: handler = logging.StreamHandler() handler.setFormatter(formatter) diff --git a/synapse/config/server.py b/synapse/config/server.py index 4f73c85466..31e44cc857 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -47,8 +47,12 @@ class ServerConfig(Config): def add_arguments(cls, parser): super(ServerConfig, cls).add_arguments(parser) server_group = parser.add_argument_group("server") - server_group.add_argument("-H", "--server-name", default="localhost", - help="The name of the server") + server_group.add_argument( + "-H", "--server-name", default="localhost", + help="The domain name of the server, with optional explicit port. " + "This is used by remote servers to connect to this server, " + "e.g. matrix.org, localhost:8080, etc." + ) server_group.add_argument("--signing-key-path", help="The signing key to sign messages with") server_group.add_argument("-p", "--bind-port", metavar="PORT", diff --git a/synapse/events/builder.py b/synapse/events/builder.py index d4cb602ebb..a9b1b99a10 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -33,12 +33,6 @@ class EventBuilder(EventBase): unsigned=unsigned ) - def update_event_key(self, key, value): - self._event_dict[key] = value - - def update_event_keys(self, other_dict): - self._event_dict.update(other_dict) - def build(self): return FrozenEvent.from_event(self) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 4f4914467c..bcb5457278 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -89,16 +89,24 @@ def prune_event(event): return type(event)(allowed_fields) -def serialize_event(hs, e): +def serialize_event(hs, e, client_event=True): # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e # Should this strip out None's? d = {k: v for k, v in e.get_dict().items()} + + if not client_event: + # set the age and keep all other keys + if "age_ts" in d["unsigned"]: + now = int(hs.get_clock().time_msec()) + d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] + return d + if "age_ts" in d["unsigned"]: now = int(hs.get_clock().time_msec()) - d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] + d["age"] = now - d["unsigned"]["age_ts"] del d["unsigned"]["age_ts"] d["user_id"] = d.pop("sender", None) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a4c29b484b..6620532a60 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -256,23 +256,21 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, event_id): - """Requests all of the `current` state PDUs for a given context from + def get_state_for_room(self, destination, room_id, event_id): + """Requests all of the `current` state PDUs for a given room from a remote home server. Args: destination (str): The remote homeserver to query for the state. - context (str): The context we're interested in. + room_id (str): The id of the room we're interested in. event_id (str): The id of the event we want the state at. Returns: Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_context_state( - destination, - context, - event_id=event_id, + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, ) pdus = [ @@ -288,9 +286,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_event_auth(self, destination, context, event_id): + def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth( - destination, context, event_id, + destination, room_id, event_id, ) auth_chain = [ @@ -304,9 +302,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_backfill_request(self, origin, context, versions, limit): + def on_backfill_request(self, origin, room_id, versions, limit): pdus = yield self.handler.on_backfill_request( - origin, context, versions, limit + origin, room_id, versions, limit ) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -380,12 +378,10 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, origin, context, event_id): + def on_context_state_request(self, origin, room_id, event_id): if event_id: pdus = yield self.handler.get_state_for_pdu( - origin, - context, - event_id, + origin, room_id, event_id, ) auth_chain = yield self.store.get_auth_chain( [pdu.event_id for pdu in pdus] @@ -413,7 +409,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function def on_pull_request(self, origin, versions): - raise NotImplementedError("Pull transacions not implemented") + raise NotImplementedError("Pull transactions not implemented") @defer.inlineCallbacks def on_query_request(self, query_type, args): @@ -422,30 +418,21 @@ class ReplicationLayer(object): defer.returnValue((200, response)) else: defer.returnValue( - (404, "No handler for Query type '%s'" % (query_type, )) + (404, "No handler for Query type '%s'" % (query_type,)) ) @defer.inlineCallbacks - def on_make_join_request(self, context, user_id): - pdu = yield self.handler.on_make_join_request(context, user_id) + def on_make_join_request(self, room_id, user_id): + pdu = yield self.handler.on_make_join_request(room_id, user_id) time_now = self._clock.time_msec() - defer.returnValue({ - "event": pdu.get_pdu_json(time_now), - }) + defer.returnValue({"event": pdu.get_pdu_json(time_now)}) @defer.inlineCallbacks def on_invite_request(self, origin, content): pdu = self.event_from_pdu_json(content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) time_now = self._clock.time_msec() - defer.returnValue( - ( - 200, - { - "event": ret_pdu.get_pdu_json(time_now), - } - ) - ) + defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -462,26 +449,17 @@ class ReplicationLayer(object): })) @defer.inlineCallbacks - def on_event_auth(self, origin, context, event_id): + def on_event_auth(self, origin, room_id, event_id): time_now = self._clock.time_msec() auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue( - ( - 200, - { - "auth_chain": [ - a.get_pdu_json(time_now) for a in auth_pdus - ], - } - ) - ) + defer.returnValue((200, { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + })) @defer.inlineCallbacks - def make_join(self, destination, context, user_id): + def make_join(self, destination, room_id, user_id): ret = yield self.transport_layer.make_join( - destination=destination, - context=context, - user_id=user_id, + destination, room_id, user_id ) pdu_dict = ret["event"] @@ -494,10 +472,10 @@ class ReplicationLayer(object): def send_join(self, destination, pdu): time_now = self._clock.time_msec() _, content = yield self.transport_layer.send_join( - destination, - pdu.room_id, - pdu.event_id, - pdu.get_pdu_json(time_now), + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), ) logger.debug("Got content: %s", content) @@ -507,9 +485,6 @@ class ReplicationLayer(object): for p in content.get("state", []) ] - # FIXME: We probably want to do something with the auth_chain given - # to us - auth_chain = [ self.event_from_pdu_json(p, outlier=True) for p in content.get("auth_chain", []) @@ -523,11 +498,11 @@ class ReplicationLayer(object): }) @defer.inlineCallbacks - def send_invite(self, destination, context, event_id, pdu): + def send_invite(self, destination, room_id, event_id, pdu): time_now = self._clock.time_msec() code, content = yield self.transport_layer.send_invite( destination=destination, - context=context, + room_id=room_id, event_id=event_id, content=pdu.get_pdu_json(time_now), ) @@ -657,7 +632,7 @@ class ReplicationLayer(object): "_handle_new_pdu getting state for %s", pdu.room_id ) - state, auth_chain = yield self.get_state_for_context( + state, auth_chain = yield self.get_state_for_room( origin, pdu.room_id, pdu.event_id, ) @@ -816,7 +791,7 @@ class _TransactionQueue(object): logger.info("TX [%s] is ready for retry", destination) logger.info("TX [%s] _attempt_new_transaction", destination) - + if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -830,14 +805,15 @@ class _TransactionQueue(object): pending_failures = self.pending_failures_by_dest.pop(destination, []) if pending_pdus: - logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) + logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) if not pending_pdus and not pending_edus and not pending_failures: return logger.debug( - "TX [%s] Attempting new transaction " - "(pdus: %d, edus: %d, failures: %d)", + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", destination, len(pending_pdus), len(pending_edus), diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py new file mode 100644 index 0000000000..6800ac46c5 --- /dev/null +++ b/synapse/federation/transport/__init__.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The transport layer is responsible for both sending transactions to remote +home servers and receiving a variety of requests from other home servers. + +By default this is done over HTTPS (and all home servers are required to +support HTTPS), however individual pairings of servers may decide to +communicate over a different (albeit still reliable) protocol. +""" + +from .server import TransportLayerServer +from .client import TransportLayerClient + + +class TransportLayer(TransportLayerServer, TransportLayerClient): + """This is a basic implementation of the transport layer that translates + transactions and other requests to/from HTTP. + + Attributes: + server_name (str): Local home server host + + server (synapse.http.server.HttpServer): the http server to + register listeners on + + client (synapse.http.client.HttpClient): the http client used to + send requests + + request_handler (TransportRequestHandler): The handler to fire when we + receive requests for data. + + received_handler (TransportReceivedHandler): The handler to fire when + we receive data. + """ + + def __init__(self, homeserver, server_name, server, client): + """ + Args: + server_name (str): Local home server host + server (synapse.protocol.http.HttpServer): the http server to + register listeners on + client (synapse.protocol.http.HttpClient): the http client used to + send requests + """ + self.keyring = homeserver.get_keyring() + self.server_name = server_name + self.server = server + self.client = client + self.request_handler = None + self.received_handler = None diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py new file mode 100644 index 0000000000..e634a3a213 --- /dev/null +++ b/synapse/federation/transport/client.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.urls import FEDERATION_PREFIX as PREFIX +from synapse.util.logutils import log_function + +import logging +import json + + +logger = logging.getLogger(__name__) + + +class TransportLayerClient(object): + """Sends federation HTTP requests to other servers""" + + @log_function + def get_room_state(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + + @log_function + def get_event(self, destination, event_id): + """ Requests the pdu with give id and origin from the given server. + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + event_id (str): The id of the event being requested. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_pdu dest=%s, event_id=%s", + destination, event_id) + + path = PREFIX + "/event/%s/" % (event_id, ) + return self.client.get_json(destination, path=path) + + @log_function + def backfill(self, destination, room_id, event_tuples, limit): + """ Requests `limit` previous PDUs in a given context before list of + PDUs. + + Args: + dest (str) + room_id (str) + event_tuples (list) + limt (int) + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug( + "backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s", + destination, room_id, repr(event_tuples), str(limit) + ) + + if not event_tuples: + # TODO: raise? + return + + path = PREFIX + "/backfill/%s/" % (room_id,) + + args = { + "v": event_tuples, + "limit": [str(limit)], + } + + return self.client.get_json( + destination, + path=path, + args=args, + ) + + @defer.inlineCallbacks + @log_function + def send_transaction(self, transaction, json_data_callback=None): + """ Sends the given Transaction to its destination + + Args: + transaction (Transaction) + + Returns: + Deferred: Results of the deferred is a tuple in the form of + (response_code, response_body) where the response_body is a + python dict decoded from json + """ + logger.debug( + "send_data dest=%s, txid=%s", + transaction.destination, transaction.transaction_id + ) + + if transaction.destination == self.server_name: + raise RuntimeError("Transport layer cannot send to itself!") + + # FIXME: This is only used by the tests. The actual json sent is + # generated by the json_data_callback. + json_data = transaction.get_dict() + + code, response = yield self.client.put_json( + transaction.destination, + path=PREFIX + "/send/%s/" % transaction.transaction_id, + data=json_data, + json_data_callback=json_data_callback, + ) + + logger.debug( + "send_data dest=%s, txid=%s, got response: %d", + transaction.destination, transaction.transaction_id, code + ) + + defer.returnValue((code, response)) + + @defer.inlineCallbacks + @log_function + def make_query(self, destination, query_type, args, retry_on_dns_fail): + path = PREFIX + "/query/%s" % query_type + + response = yield self.client.get_json( + destination=destination, + path=path, + args=args, + retry_on_dns_fail=retry_on_dns_fail, + ) + + defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): + path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) + + response = yield self.client.get_json( + destination=destination, + path=path, + retry_on_dns_fail=retry_on_dns_fail, + ) + + defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def send_join(self, destination, room_id, event_id, content): + path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) + + code, content = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_join", code) + + defer.returnValue(json.loads(content)) + + @defer.inlineCallbacks + @log_function + def send_invite(self, destination, room_id, event_id, content): + path = PREFIX + "/invite/%s/%s" % (room_id, event_id) + + code, content = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) + + @defer.inlineCallbacks + @log_function + def get_event_auth(self, destination, room_id, event_id): + path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) + + response = yield self.client.get_json( + destination=destination, + path=path, + ) + + defer.returnValue(response) diff --git a/synapse/federation/transport.py b/synapse/federation/transport/server.py index 1f0f06e0fe..a380a6910b 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport/server.py @@ -13,14 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""The transport layer is responsible for both sending transactions to remote -home servers and receiving a variety of requests from other home servers. - -Typically, this is done over HTTP (and all home servers are required to -support HTTP), however individual pairings of servers may decide to communicate -over a different (albeit still reliable) protocol. -""" - from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX @@ -35,241 +27,8 @@ import re logger = logging.getLogger(__name__) -class TransportLayer(object): - """This is a basic implementation of the transport layer that translates - transactions and other requests to/from HTTP. - - Attributes: - server_name (str): Local home server host - - server (synapse.http.server.HttpServer): the http server to - register listeners on - - client (synapse.http.client.HttpClient): the http client used to - send requests - - request_handler (TransportRequestHandler): The handler to fire when we - receive requests for data. - - received_handler (TransportReceivedHandler): The handler to fire when - we receive data. - """ - - def __init__(self, homeserver, server_name, server, client): - """ - Args: - server_name (str): Local home server host - server (synapse.protocol.http.HttpServer): the http server to - register listeners on - client (synapse.protocol.http.HttpClient): the http client used to - send requests - """ - self.keyring = homeserver.get_keyring() - self.server_name = server_name - self.server = server - self.client = client - self.request_handler = None - self.received_handler = None - - @log_function - def get_context_state(self, destination, context, event_id=None): - """ Requests all state for a given context (i.e. room) from the - given server. - - Args: - destination (str): The host name of the remote home server we want - to get the state from. - context (str): The name of the context we want the state of - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug("get_context_state dest=%s, context=%s", - destination, context) - - subpath = "/state/%s/" % context - - args = {} - if event_id: - args["event_id"] = event_id - - return self._do_request_for_transaction( - destination, subpath, args=args - ) - - @log_function - def get_event(self, destination, event_id): - """ Requests the pdu with give id and origin from the given server. - - Args: - destination (str): The host name of the remote home server we want - to get the state from. - event_id (str): The id of the event being requested. - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug("get_pdu dest=%s, event_id=%s", - destination, event_id) - - subpath = "/event/%s/" % (event_id, ) - - return self._do_request_for_transaction(destination, subpath) - - @log_function - def backfill(self, dest, context, event_tuples, limit): - """ Requests `limit` previous PDUs in a given context before list of - PDUs. - - Args: - dest (str) - context (str) - event_tuples (list) - limt (int) - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug( - "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", - dest, context, repr(event_tuples), str(limit) - ) - - if not event_tuples: - # TODO: raise? - return - - subpath = "/backfill/%s/" % (context,) - - args = { - "v": event_tuples, - "limit": [str(limit)], - } - - return self._do_request_for_transaction( - dest, - subpath, - args=args, - ) - - @defer.inlineCallbacks - @log_function - def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to its destination - - Args: - transaction (Transaction) - - Returns: - Deferred: Results of the deferred is a tuple in the form of - (response_code, response_body) where the response_body is a - python dict decoded from json - """ - logger.debug( - "send_data dest=%s, txid=%s", - transaction.destination, transaction.transaction_id - ) - - if transaction.destination == self.server_name: - raise RuntimeError("Transport layer cannot send to itself!") - - # FIXME: This is only used by the tests. The actual json sent is - # generated by the json_data_callback. - json_data = transaction.get_dict() - - code, response = yield self.client.put_json( - transaction.destination, - path=PREFIX + "/send/%s/" % transaction.transaction_id, - data=json_data, - json_data_callback=json_data_callback, - ) - - logger.debug( - "send_data dest=%s, txid=%s, got response: %d", - transaction.destination, transaction.transaction_id, code - ) - - defer.returnValue((code, response)) - - @defer.inlineCallbacks - @log_function - def make_query(self, destination, query_type, args, retry_on_dns_fail): - path = PREFIX + "/query/%s" % query_type - - response = yield self.client.get_json( - destination=destination, - path=path, - args=args, - retry_on_dns_fail=retry_on_dns_fail, - ) - - defer.returnValue(response) - - @defer.inlineCallbacks - @log_function - def make_join(self, destination, context, user_id, retry_on_dns_fail=True): - path = PREFIX + "/make_join/%s/%s" % (context, user_id,) - - response = yield self.client.get_json( - destination=destination, - path=path, - retry_on_dns_fail=retry_on_dns_fail, - ) - - defer.returnValue(response) - - @defer.inlineCallbacks - @log_function - def send_join(self, destination, context, event_id, content): - path = PREFIX + "/send_join/%s/%s" % ( - context, - event_id, - ) - - code, content = yield self.client.put_json( - destination=destination, - path=path, - data=content, - ) - - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_join", code) - - defer.returnValue(json.loads(content)) - - @defer.inlineCallbacks - @log_function - def send_invite(self, destination, context, event_id, content): - path = PREFIX + "/invite/%s/%s" % ( - context, - event_id, - ) - - code, content = yield self.client.put_json( - destination=destination, - path=path, - data=content, - ) - - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) - - @defer.inlineCallbacks - @log_function - def get_event_auth(self, destination, context, event_id): - path = PREFIX + "/event_auth/%s/%s" % ( - context, - event_id, - ) - - response = yield self.client.get_json( - destination=destination, - path=path, - ) - - defer.returnValue(response) +class TransportLayerServer(object): + """Handles incoming federation HTTP requests""" @defer.inlineCallbacks def _authenticate_request(self, request): @@ -373,8 +132,6 @@ class TransportLayer(object): """ self.request_handler = handler - # TODO(markjh): Namespace the federation URI paths - # This is for when someone asks us for everything since version X self.server.register_path( "GET", @@ -528,34 +285,6 @@ class TransportLayer(object): defer.returnValue((code, response)) - @defer.inlineCallbacks - @log_function - def _do_request_for_transaction(self, destination, subpath, args={}): - """ - Args: - destination (str) - path (str) - args (dict): This is parsed directly to the HttpClient. - - Returns: - Deferred: Results in a dict. - """ - - data = yield self.client.get_json( - destination, - path=PREFIX + subpath, - args=args, - ) - - # Add certain keys to the JSON, ready for decoding as a Transaction - data.update( - origin=destination, - destination=self.server_name, - transaction_id=None - ) - - defer.returnValue(data) - @log_function def _on_backfill_request(self, origin, context, v_list, limits): if not limits: diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 38af034b4d..f33d17a31e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -144,7 +144,5 @@ class BaseHandler(object): yield self.notifier.on_new_room_event(event, extra_users=extra_users) yield federation_handler.handle_new_event( - event, - None, - destinations=destinations, + event, destinations=destinations, ) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 808219bd10..103bc67c42 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -46,7 +46,8 @@ class EventStreamHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_stream(self, auth_user_id, pagin_config, timeout=0): + def get_stream(self, auth_user_id, pagin_config, timeout=0, + as_client_event=True): auth_user = self.hs.parse_userid(auth_user_id) try: @@ -69,16 +70,16 @@ class EventStreamHandler(BaseHandler): pagin_config.from_token = None rm_handler = self.hs.get_handlers().room_member_handler - logger.debug("BETA") room_ids = yield rm_handler.get_rooms_for_user(auth_user) - logger.debug("ALPHA") with PreserveLoggingContext(): events, tokens = yield self.notifier.get_events_for( auth_user, room_ids, pagin_config, timeout ) - chunks = [self.hs.serialize_event(e) for e in events] + chunks = [ + self.hs.serialize_event(e, as_client_event) for e in events + ] chunk = { "chunk": chunks, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d26975a88a..81203bf1a3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -75,14 +75,14 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def handle_new_event(self, event, snapshot, destinations): + def handle_new_event(self, event, destinations): """ Takes in an event from the client to server side, that has already been authed and handled by the state module, and sends it to any remote home servers that may be interested. Args: - event - snapshot (.storage.Snapshot): THe snapshot the event happened after + event: The event to send + destinations: A list of destinations to send it to Returns: Deferred: Resolved when it has successfully been queued for @@ -154,7 +154,7 @@ class FederationHandler(BaseHandler): replication = self.replication_layer if not state: - state, auth_chain = yield replication.get_state_for_context( + state, auth_chain = yield replication.get_state_for_room( origin, context=event.room_id, event_id=event.event_id, ) @@ -281,7 +281,7 @@ class FederationHandler(BaseHandler): """ pdu = yield self.replication_layer.send_invite( destination=target_host, - context=event.room_id, + room_id=event.room_id, event_id=event.event_id, pdu=event ) @@ -617,13 +617,13 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_backfill_request(self, origin, context, pdu_list, limit): - in_room = yield self.auth.check_host_in_room(context, origin) + def on_backfill_request(self, origin, room_id, pdu_list, limit): + in_room = yield self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") events = yield self.store.get_backfill_events( - context, + room_id, pdu_list, limit ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7195de98b5..f2a2f16933 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -67,7 +67,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False): + feedback=False, as_client_event=True): """Get messages in a room. Args: @@ -76,6 +76,7 @@ class MessageHandler(BaseHandler): pagin_config (synapse.api.streams.PaginationConfig): The pagination config rules to apply, if any. feedback (bool): True to get compressed feedback with the messages + as_client_event (bool): True to get events in client-server format. Returns: dict: Pagination API results """ @@ -99,7 +100,9 @@ class MessageHandler(BaseHandler): ) chunk = { - "chunk": [self.hs.serialize_event(e) for e in events], + "chunk": [ + self.hs.serialize_event(e, as_client_event) for e in events + ], "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), } @@ -211,7 +214,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, - feedback=False): + feedback=False, as_client_event=True): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -222,6 +225,7 @@ class MessageHandler(BaseHandler): pagin_config (synapse.api.streams.PaginationConfig): The pagination config used to determine how many messages *PER ROOM* to return. feedback (bool): True to get feedback along with these messages. + as_client_event (bool): True to get events in client-server format. Returns: A list of dicts with "room_id" and "membership" keys for all rooms the user is currently invited or joined in on. Rooms where the user @@ -280,7 +284,10 @@ class MessageHandler(BaseHandler): end_token = now_token.copy_and_replace("room_key", token[1]) d["messages"] = { - "chunk": [self.hs.serialize_event(m) for m in messages], + "chunk": [ + self.hs.serialize_event(m, as_client_event) + for m in messages + ], "start": start_token.to_string(), "end": end_token.to_string(), } diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 59719a1fae..6d0db18e51 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -425,10 +425,22 @@ class RoomMemberHandler(BaseHandler): event.room_id, self.hs.hostname ) + if not is_host_in_room: + # is *anyone* in the room? + room_member_keys = [ + v for (k, v) in context.current_state.keys() if ( + k == "m.room.member" + ) + ] + if len(room_member_keys) == 0: + # has the room been created so we can join it? + create_event = context.current_state.get(("m.room.create", "")) + if create_event: + is_host_in_room = True if is_host_in_room: should_do_dance = False - elif room_host: + elif room_host: # TODO: Shouldn't this be remote_room_host? should_do_dance = True else: # TODO(markjh): get prev_state from snapshot @@ -442,7 +454,8 @@ class RoomMemberHandler(BaseHandler): should_do_dance = not self.hs.is_mine(inviter) room_host = inviter.domain else: - should_do_dance = False + # return the same error as join_room_alias does + raise SynapseError(404, "No known servers") if should_do_dance: handler = self.hs.get_handlers().federation_handler diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index ab698b36e1..cd9638dd04 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -83,9 +83,15 @@ class TypingNotificationHandler(BaseHandler): if member in self._member_typing_timer: self.clock.cancel_call_later(self._member_typing_timer[member]) + def _cb(): + logger.debug( + "%s has timed out in %s", target_user.to_string(), room_id + ) + self._stopped_typing(member) + self._member_typing_until[member] = until self._member_typing_timer[member] = self.clock.call_later( - timeout / 1000, lambda: self._stopped_typing(member) + timeout / 1000.0, _cb ) if was_present: @@ -114,6 +120,10 @@ class TypingNotificationHandler(BaseHandler): member = RoomMember(room_id=room_id, user=target_user) + if member in self._member_typing_timer: + self.clock.cancel_call_later(self._member_typing_timer[member]) + del self._member_typing_timer[member] + yield self._stopped_typing(member) @defer.inlineCallbacks @@ -136,8 +146,10 @@ class TypingNotificationHandler(BaseHandler): del self._member_typing_until[member] - self.clock.cancel_call_later(self._member_typing_timer[member]) - del self._member_typing_timer[member] + if member in self._member_typing_timer: + # Don't cancel it - either it already expired, or the real + # stopped_typing() will cancel it + del self._member_typing_timer[member] @defer.inlineCallbacks def _push_update(self, room_id, user, typing): diff --git a/synapse/http/client.py b/synapse/http/client.py index e5d4939e2d..7793bab106 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -53,7 +53,7 @@ class SimpleHttpClient(object): uri.encode("ascii"), headers=Headers({ b"Content-Type": [b"application/x-www-form-urlencoded"], - b"User-Agent": AGENT_NAME, + b"User-Agent": [AGENT_NAME], }), bodyProducer=FileBodyProducer(StringIO(query_bytes)) ) @@ -89,7 +89,7 @@ class SimpleHttpClient(object): "GET", uri.encode("ascii"), headers=Headers({ - b"User-Agent": AGENT_NAME, + b"User-Agent": [AGENT_NAME], }) ) @@ -114,7 +114,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): bodyProducer=FileBodyProducer(StringIO(query_bytes)), headers=Headers({ b"Content-Type": [b"application/x-www-form-urlencoded"], - b"User-Agent": AGENT_NAME, + b"User-Agent": [AGENT_NAME], }) ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index aa14782b0f..1dda3ba2c7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -72,7 +72,6 @@ class MatrixFederationHttpClient(object): requests. """ - def __init__(self, hs): self.hs = hs self.signing_key = hs.config.signing_key[0] diff --git a/synapse/media/v1/__init__.py b/synapse/media/v1/__init__.py index 619999d268..d6c6690577 100644 --- a/synapse/media/v1/__init__.py +++ b/synapse/media/v1/__init__.py @@ -22,7 +22,8 @@ except IOError as e: if str(e).startswith("decoder jpeg not available"): raise Exception( "FATAL: jpeg codec not supported. Install pillow correctly! " - " 'sudo apt-get install libjpeg-dev' then 'pip install -I pillow'" + " 'sudo apt-get install libjpeg-dev' then 'pip uninstall pillow &&" + " pip install pillow --user'" ) except Exception: # any other exception is fine @@ -36,7 +37,8 @@ except IOError as e: if str(e).startswith("decoder zip not available"): raise Exception( "FATAL: zip codec not supported. Install pillow correctly! " - " 'sudo apt-get install libjpeg-dev' then 'pip install -I pillow'" + " 'sudo apt-get install libjpeg-dev' then 'pip uninstall pillow &&" + " pip install pillow --user'" ) except Exception: # any other exception is fine diff --git a/synapse/media/v1/thumbnailer.py b/synapse/media/v1/thumbnailer.py index bc86efea8f..28404f2b7b 100644 --- a/synapse/media/v1/thumbnailer.py +++ b/synapse/media/v1/thumbnailer.py @@ -82,7 +82,7 @@ class Thumbnailer(object): def save_image(self, output_image, output_type, output_path): output_bytes_io = BytesIO() - output_image.save(output_bytes_io, self.FORMATS[output_type]) + output_image.save(output_bytes_io, self.FORMATS[output_type], quality=70) output_bytes = output_bytes_io.getvalue() with open(output_path, "wb") as output_file: output_file.write(output_bytes) diff --git a/synapse/notifier.py b/synapse/notifier.py index b9d52d0c4c..3aec1d4af2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -244,14 +244,14 @@ class Notifier(object): ) if timeout: - self.clock.call_later(timeout/1000.0, _timeout_listener) - self._register_with_keys(listener) yield self._check_for_updates(listener) if not timeout: _timeout_listener() + else: + self.clock.call_later(timeout/1000.0, _timeout_listener) return diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py new file mode 100644 index 0000000000..b1fae991e0 --- /dev/null +++ b/synapse/python_dependencies.py @@ -0,0 +1,80 @@ +import logging +from distutils.version import LooseVersion + +logger = logging.getLogger(__name__) + +REQUIREMENTS = { + "syutil==0.0.2": ["syutil"], + "matrix_angular_sdk==0.6.0": ["syweb==0.6.0"], + "Twisted>=14.0.0": ["twisted>=14.0.0"], + "service_identity>=1.0.0": ["service_identity>=1.0.0"], + "pyopenssl>=0.14": ["OpenSSL>=0.14"], + "pyyaml": ["yaml"], + "pyasn1": ["pyasn1"], + "pynacl": ["nacl"], + "daemonize": ["daemonize"], + "py-bcrypt": ["bcrypt"], + "frozendict>=0.4": ["frozendict"], + "pillow": ["PIL"], +} + + +class MissingRequirementError(Exception): + pass + + +def check_requirements(): + """Checks that all the modules needed by synapse have been correctly + installed and are at the correct version""" + for dependency, module_requirements in REQUIREMENTS.items(): + for module_requirement in module_requirements: + if ">=" in module_requirement: + module_name, required_version = module_requirement.split(">=") + version_test = ">=" + elif "==" in module_requirement: + module_name, required_version = module_requirement.split("==") + version_test = "==" + else: + module_name = module_requirement + version_test = None + + try: + module = __import__(module_name) + except ImportError: + logging.exception( + "Can't import %r which is part of %r", + module_name, dependency + ) + raise MissingRequirementError( + "Can't import %r which is part of %r" + % (module_name, dependency) + ) + version = getattr(module, "__version__", None) + file_path = getattr(module, "__file__", None) + logger.info( + "Using %r version %r from %r to satisfy %r", + module_name, version, file_path, dependency + ) + + if version_test == ">=": + if version is None: + raise MissingRequirementError( + "Version of %r isn't set as __version__ of module %r" + % (dependency, module_name) + ) + if LooseVersion(version) < LooseVersion(required_version): + raise MissingRequirementError( + "Version of %r in %r is too old. %r < %r" + % (dependency, file_path, version, required_version) + ) + elif version_test == "==": + if version is None: + raise MissingRequirementError( + "Version of %r isn't set as __version__ of module %r" + % (dependency, module_name) + ) + if LooseVersion(version) != LooseVersion(required_version): + raise MissingRequirementError( + "Unexpected version of %r in %r. %r != %r" + % (dependency, file_path, version, required_version) + ) diff --git a/synapse/rest/events.py b/synapse/rest/events.py index cf6d13f817..bedcb2bcc6 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -44,8 +44,11 @@ class EventStreamRestServlet(RestServlet): except ValueError: raise SynapseError(400, "timeout must be in milliseconds.") + as_client_event = "raw" not in request.args + chunk = yield handler.get_stream( - auth_user.to_string(), pagin_config, timeout=timeout + auth_user.to_string(), pagin_config, timeout=timeout, + as_client_event=as_client_event ) except: logger.exception("Event stream failed") diff --git a/synapse/rest/initial_sync.py b/synapse/rest/initial_sync.py index a571589581..b13d56b286 100644 --- a/synapse/rest/initial_sync.py +++ b/synapse/rest/initial_sync.py @@ -27,12 +27,15 @@ class InitialSyncRestServlet(RestServlet): def on_GET(self, request): user = yield self.auth.get_user_by_req(request) with_feedback = "feedback" in request.args + as_client_event = "raw" not in request.args pagination_config = PaginationConfig.from_request(request) handler = self.handlers.message_handler content = yield handler.snapshot_all_rooms( user_id=user.to_string(), pagin_config=pagination_config, - feedback=with_feedback) + feedback=with_feedback, + as_client_event=as_client_event + ) defer.returnValue((200, content)) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index e40773758a..48bba2a5f3 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -246,7 +246,7 @@ class JoinRoomAliasServlet(RestServlet): } ) - defer.returnValue((200, {})) + defer.returnValue((200, {"room_id": identifier.to_string()})) @defer.inlineCallbacks def on_PUT(self, request, room_identifier, txn_id): @@ -314,12 +314,15 @@ class RoomMessageListRestServlet(RestServlet): request, default_limit=10, ) with_feedback = "feedback" in request.args + as_client_event = "raw" not in request.args handler = self.handlers.message_handler msgs = yield handler.get_messages( room_id=room_id, user_id=user.to_string(), pagin_config=pagination_config, - feedback=with_feedback) + feedback=with_feedback, + as_client_event=as_client_event + ) defer.returnValue((200, msgs)) diff --git a/synapse/server.py b/synapse/server.py index c3bf46abbf..d861efd2fd 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -149,8 +149,8 @@ class BaseHomeServer(object): object.""" return EventID.from_string(s) - def serialize_event(self, e): - return serialize_event(self, e) + def serialize_event(self, e, as_client_event=True): + return serialize_event(self, e, as_client_event) def get_ip_from_request(self, request): # May be an X-Forwarding-For header depending on config diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 978b2c4a48..6542f8e4f8 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -58,13 +58,6 @@ class RoomStore(SQLBaseStore): logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") - def store_room_config(self, room_id, visibility): - return self._simple_update_one( - table=RoomsTable.table_name, - keyvalues={"room_id": room_id}, - updatevalues={"is_public": visibility} - ) - def get_room(self, room_id): """Retrieve a room. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5327517704..71db16d0e5 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -78,12 +78,6 @@ class StateStore(SQLBaseStore): f, ) - def store_state_groups(self, event): - return self.runInteraction( - "store_state_groups", - self._store_state_groups_txn, event - ) - def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index bedc3c6c52..8ac2adab05 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,6 +39,8 @@ from ._base import SQLBaseStore from synapse.api.errors import SynapseError from synapse.util.logutils import log_function +from collections import namedtuple + import logging @@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" -def _parse_stream_token(string): - try: - if string[0] != 's': - raise - return int(string[1:]) - except: - raise SynapseError(400, "Invalid token") - - -def _parse_topological_token(string): - try: - if string[0] != 't': - raise - parts = string[1:].split('-', 1) - return (int(parts[0]), int(parts[1])) - except: - raise SynapseError(400, "Invalid token") - - -def is_stream_token(string): - try: - _parse_stream_token(string) - return True - except: - return False - - -def is_topological_token(string): - try: - _parse_topological_token(string) - return True - except: - return False - - -def _get_token_bound(token, comparison): - try: - s = _parse_stream_token(token) - return "%s %s %d" % ("stream_ordering", comparison, s) - except: - pass - - try: - top, stream = _parse_topological_token(token) - return "%s %s %d AND %s %s %d" % ( - "topological_ordering", comparison, top, - "stream_ordering", comparison, stream, - ) - except: - pass - - raise SynapseError(400, "Invalid token") - - -class StreamStore(SQLBaseStore): - @log_function - def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, - direction='f', with_feedback=False): - # We deal with events request in two different ways depending on if - # this looks like an /events request or a pagination request. - is_events = ( - direction == 'f' - and user_id - and is_stream_token(from_key) - and to_key and is_stream_token(to_key) - ) +class _StreamToken(namedtuple("_StreamToken", "topological stream")): + """Tokens are positions between events. The token "s1" comes after event 1. + + s0 s1 + | | + [0] V [1] V [2] + + Tokens can either be a point in the live event stream or a cursor going + through historic events. + + When traversing the live event stream events are ordered by when they + arrived at the homeserver. + + When traversing historic events the events are ordered by their depth in + the event graph "topological_ordering" and then by when they arrived at the + homeserver "stream_ordering". + + Live tokens start with an "s" followed by the "stream_ordering" id of the + event it comes after. Historic tokens start with a "t" followed by the + "topological_ordering" id of the event it comes after, follewed by "-", + followed by the "stream_ordering" id of the event it comes after. + """ + __slots__ = [] + + @classmethod + def parse(cls, string): + try: + if string[0] == 's': + return cls(None, int(string[1:])) + if string[0] == 't': + parts = string[1:].split('-', 1) + return cls(int(parts[1]), int(parts[0])) + except: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + @classmethod + def parse_stream_token(cls, string): + try: + if string[0] == 's': + return cls(None, int(string[1:])) + except: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + def __str__(self): + if self.topological is not None: + return "t%d-%d" % (self.topological, self.stream) + else: + return "s%d" % (self.stream,) - if is_events: - return self.get_room_events_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - room_id=room_id, - limit=limit, - with_feedback=with_feedback, + def lower_bound(self): + if self.topological is None: + return "(%d < %s)" % (self.stream, "stream_ordering") + else: + return "(%d < %s OR (%d == %s AND %d < %s))" % ( + self.topological, "topological_ordering", + self.topological, "topological_ordering", + self.stream, "stream_ordering", ) + + def upper_bound(self): + if self.topological is None: + return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return self.paginate_room_events( - from_key=from_key, - to_key=to_key, - room_id=room_id, - limit=limit, - with_feedback=with_feedback, + return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + self.topological, "topological_ordering", + self.topological, "topological_ordering", + self.stream, "stream_ordering", ) + +class StreamStore(SQLBaseStore): @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): @@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_id = _parse_stream_token(from_key) - to_id = _parse_stream_token(to_key) + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: return defer.succeed(([], to_key)) @@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id, to_id,)) + txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -211,17 +201,21 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - from_comp = '<=' if direction == 'b' else '>' - to_comp = '>' if direction == 'b' else '<=' - order = "DESC" if direction == 'b' else "ASC" - args = [room_id] - - bounds = _get_token_bound(from_key, from_comp) - if to_key: - bounds = "%s AND %s" % ( - bounds, _get_token_bound(to_key, to_comp) - ) + if direction == 'b': + order = "DESC" + bounds = _StreamToken.parse(from_key).upper_bound() + if to_key: + bounds = "%s AND %s" % ( + bounds, _StreamToken.parse(to_key).lower_bound() + ) + else: + order = "ASC" + bounds = _StreamToken.parse(from_key).lower_bound() + if to_key: + bounds = "%s AND %s" % ( + bounds, _StreamToken.parse(to_key).upper_bound() + ) if int(limit) > 0: args.append(int(limit)) @@ -249,9 +243,13 @@ class StreamStore(SQLBaseStore): topo = rows[-1]["topological_ordering"] toke = rows[-1]["stream_ordering"] if direction == 'b': - topo -= 1 + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. toke -= 1 - next_token = "t%s-%s" % (topo, toke) + next_token = str(_StreamToken(topo, toke)) else: # TODO (erikj): We should work out what to do here instead. next_token = to_key if to_key else from_key @@ -284,9 +282,14 @@ class StreamStore(SQLBaseStore): rows.reverse() # As we selected with reverse ordering if rows: + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # since we are going backwards so we subtract one from the + # stream part. topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] - start_token = "t%s-%s" % (topo, toke) + toke = rows[0]["stream_ordering"] - 1 + start_token = str(_StreamToken(topo, toke)) token = (start_token, end_token) else: |