diff options
Diffstat (limited to 'synapse')
34 files changed, 1628 insertions, 516 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 2473a2b2bb..54ecbe5b3a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -17,9 +17,10 @@ from twisted.internet import defer -from synapse.api.constants import Membership +from synapse.api.constants import Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes from synapse.api.events.room import RoomMemberEvent +from synapse.util.logutils import log_function import logging @@ -44,16 +45,29 @@ class Auth(object): """ try: if hasattr(event, "room_id"): + is_state = hasattr(event, "state_key") + if event.type == RoomMemberEvent.TYPE: + yield self._can_replace_state(event) allowed = yield self.is_membership_change_allowed(event) defer.returnValue(allowed) + return + + self._check_joined_room( + member=snapshot.membership_state, + user_id=snapshot.user_id, + room_id=snapshot.room_id, + ) + + if is_state: + # TODO (erikj): This really only should be called for *new* + # state + yield self._can_add_state(event) + yield self._can_replace_state(event) else: - self._check_joined_room( - member=snapshot.membership_state, - user_id=snapshot.user_id, - room_id=snapshot.room_id, - ) - defer.returnValue(True) + yield self._can_send_event(event) + + defer.returnValue(True) else: raise AuthError(500, "Unknown event: %s" % event) except AuthError as e: @@ -111,7 +125,14 @@ class Auth(object): membership = event.content["membership"] + join_rule = yield self.store.get_room_join_rule(event.room_id) + if not join_rule: + join_rule = JoinRules.INVITE + if Membership.INVITE == membership: + # TODO (erikj): We should probably handle this more intelligently + # PRIVATE join rules. + # Invites are valid iff caller is in the room and target isn't. if not caller_in_room: # caller isn't joined raise AuthError(403, "You are not in room %s." % event.room_id) @@ -124,18 +145,54 @@ class Auth(object): # joined: It's a NOOP if event.user_id != target_user_id: raise AuthError(403, "Cannot force another user to join.") - elif room.is_public: - pass # anyone can join public rooms. - elif (not caller or caller.membership not in - [Membership.INVITE, Membership.JOIN]): - raise AuthError(403, "You are not invited to this room.") + elif join_rule == JoinRules.PUBLIC or room.is_public: + pass + elif join_rule == JoinRules.INVITE: + if ( + not caller or caller.membership not in + [Membership.INVITE, Membership.JOIN] + ): + raise AuthError(403, "You are not invited to this room.") + else: + # TODO (erikj): may_join list + # TODO (erikj): private rooms + raise AuthError(403, "You are not allowed to join this room") elif Membership.LEAVE == membership: + # TODO (erikj): Implement kicks. + if not caller_in_room: # trying to leave a room you aren't joined raise AuthError(403, "You are not in room %s." % event.room_id) elif target_user_id != event.user_id: - # trying to force another user to leave - raise AuthError(403, "Cannot force %s to leave." % - target_user_id) + user_level = yield self.store.get_power_level( + event.room_id, + event.user_id, + ) + _, kick_level = yield self.store.get_ops_levels(event.room_id) + + if kick_level: + kick_level = int(kick_level) + else: + kick_level = 5 + + if user_level < kick_level: + raise AuthError( + 403, "You cannot kick user %s." % target_user_id + ) + elif Membership.BAN == membership: + user_level = yield self.store.get_power_level( + event.room_id, + event.user_id, + ) + + ban_level, _ = yield self.store.get_ops_levels(event.room_id) + + if ban_level: + ban_level = int(ban_level) + else: + ban_level = 5 # FIXME (erikj): What should we do here? + + if user_level < ban_level: + raise AuthError(403, "You don't have permission to ban") else: raise AuthError(500, "Unknown membership %s" % membership) @@ -176,3 +233,85 @@ class Auth(object): except StoreError: raise AuthError(403, "Unrecognised access token.", errcode=Codes.UNKNOWN_TOKEN) + + @defer.inlineCallbacks + @log_function + def _can_send_event(self, event): + send_level = yield self.store.get_send_event_level(event.room_id) + + if send_level: + send_level = int(send_level) + else: + send_level = 0 + + user_level = yield self.store.get_power_level( + event.room_id, + event.user_id, + ) + + if user_level: + user_level = int(user_level) + else: + user_level = 0 + + if user_level < send_level: + raise AuthError( + 403, "You don't have permission to post to the room" + ) + + defer.returnValue(True) + + @defer.inlineCallbacks + def _can_add_state(self, event): + add_level = yield self.store.get_add_state_level(event.room_id) + + if not add_level: + defer.returnValue(True) + + add_level = int(add_level) + + user_level = yield self.store.get_power_level( + event.room_id, + event.user_id, + ) + + user_level = int(user_level) + + if user_level < add_level: + raise AuthError( + 403, "You don't have permission to add state to the room" + ) + + defer.returnValue(True) + + @defer.inlineCallbacks + def _can_replace_state(self, event): + current_state = yield self.store.get_current_state( + event.room_id, + event.type, + event.state_key, + ) + + if current_state: + current_state = current_state[0] + + user_level = yield self.store.get_power_level( + event.room_id, + event.user_id, + ) + + if user_level: + user_level = int(user_level) + else: + user_level = 0 + + logger.debug("Checking power level for %s, %s", event.user_id, user_level) + if current_state and hasattr(current_state, "required_power_level"): + req = current_state.required_power_level + + logger.debug("Checked power level for %s, %s", event.user_id, req) + if user_level < req: + raise AuthError( + 403, + "You don't have permission to change that state" + ) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index f69f2445a2..668ffa07ca 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -23,7 +23,8 @@ class Membership(object): JOIN = u"join" KNOCK = u"knock" LEAVE = u"leave" - LIST = (INVITE, JOIN, KNOCK, LEAVE) + BAN = u"ban" + LIST = (INVITE, JOIN, KNOCK, LEAVE, BAN) class Feedback(object): @@ -42,3 +43,10 @@ class PresenceState(object): UNAVAILABLE = u"unavailable" ONLINE = u"online" FREE_FOR_CHAT = u"free_for_chat" + + +class JoinRules(object): + PUBLIC = u"public" + KNOCK = u"knock" + INVITE = u"invite" + PRIVATE = u"private" diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index f9653e0b2a..9502f5df8f 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -42,6 +42,7 @@ class SynapseEvent(JsonEncodedObject): "user_id", # sender/initiator "content", # HTTP body, JSON "state_key", + "required_power_level", ] internal_keys = [ @@ -52,6 +53,7 @@ class SynapseEvent(JsonEncodedObject): "destinations", "origin", "outlier", + "power_level", ] required_keys = [ @@ -152,3 +154,10 @@ class SynapseEvent(JsonEncodedObject): msg = self._check_json(entry, template[key][0]) if msg: return msg + + +class SynapseStateEvent(SynapseEvent): + def __init__(self, **kwargs): + if "state_key" not in kwargs: + kwargs["state_key"] = "" + super(SynapseStateEvent, self).__init__(**kwargs) diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index c2cdcddf41..159728b2d2 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -16,6 +16,8 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent, + RoomPowerLevelsEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, + RoomCreateEvent, RoomAddStateLevelEvent, RoomSendEventLevelEvent ) from synapse.util.stringutils import random_string @@ -30,7 +32,13 @@ class EventFactory(object): RoomMemberEvent, FeedbackEvent, InviteJoinEvent, - RoomConfigEvent + RoomConfigEvent, + RoomPowerLevelsEvent, + RoomJoinRulesEvent, + RoomCreateEvent, + RoomAddStateLevelEvent, + RoomSendEventLevelEvent, + RoomOpsPowerLevelsEvent, ] def __init__(self, hs): diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index 9faad57ac0..f6d3c59a9a 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -15,7 +15,7 @@ from synapse.api.constants import Feedback, Membership from synapse.api.errors import SynapseError -from . import SynapseEvent +from . import SynapseEvent, SynapseStateEvent class GenericEvent(SynapseEvent): @@ -132,3 +132,45 @@ class RoomConfigEvent(SynapseEvent): def get_content_template(self): return {} + + +class RoomCreateEvent(SynapseStateEvent): + TYPE = "m.room.create" + + def get_content_template(self): + return {} + + +class RoomJoinRulesEvent(SynapseStateEvent): + TYPE = "m.room.join_rules" + + def get_content_template(self): + return {} + + +class RoomPowerLevelsEvent(SynapseStateEvent): + TYPE = "m.room.power_levels" + + def get_content_template(self): + return {} + + +class RoomAddStateLevelEvent(SynapseStateEvent): + TYPE = "m.room.add_state_level" + + def get_content_template(self): + return {} + + +class RoomSendEventLevelEvent(SynapseStateEvent): + TYPE = "m.room.send_event_level" + + def get_content_template(self): + return {} + + +class RoomOpsPowerLevelsEvent(SynapseStateEvent): + TYPE = "m.room.ops_levels" + + def get_content_template(self): + return {} diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 05ca000787..3d0b5de965 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -15,7 +15,7 @@ """Contains the URL paths to prefix various aspects of the server with. """ -CLIENT_PREFIX = "/matrix/client/api/v1" -FEDERATION_PREFIX = "/matrix/federation/v1" -WEB_CLIENT_PREFIX = "/matrix/client" -CONTENT_REPO_PREFIX = "/matrix/content" \ No newline at end of file +CLIENT_PREFIX = "/_matrix/client/api/v1" +FEDERATION_PREFIX = "/_matrix/federation/v1" +WEB_CLIENT_PREFIX = "/_matrix/client" +CONTENT_REPO_PREFIX = "/_matrix/content" \ No newline at end of file diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6d292ccf9a..606c9c650d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,7 +20,6 @@ from synapse.server import HomeServer from twisted.internet import reactor from twisted.enterprise import adbapi -from twisted.python.log import PythonLoggingObserver from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site @@ -29,16 +28,17 @@ from synapse.http.client import TwistedHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX ) +from synapse.config.homeserver import HomeServerConfig +from synapse.crypto import context_factory from daemonize import Daemonize import twisted.manhole.telnet -import argparse import logging -import logging.config import sqlite3 import os import re +import sys logger = logging.getLogger(__name__) @@ -56,13 +56,13 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 1 +SCHEMA_VERSION = 2 class SynapseHomeServer(HomeServer): def build_http_client(self): - return TwistedHttpClient() + return TwistedHttpClient(self) def build_resource_for_client(self): return JsonResource() @@ -206,37 +206,17 @@ class SynapseHomeServer(HomeServer): """ return "%s-%s" % (resource, path_seg) - def start_listening(self, port): - reactor.listenTCP(port, Site(self.root_resource)) - logger.info("Synapse now listening on port %d", port) - - -def setup_logging(verbosity=0, filename=None, config_path=None): - """ Sets up logging with verbosity levels. - - Args: - verbosity: The verbosity level. - filename: Log to the given file rather than to the console. - config_path: Path to a python logging config file. - """ - - if config_path is None: - log_format = ( - '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s' - ) - - level = logging.INFO - if verbosity: - level = logging.DEBUG - - # FIXME: we need a logging.WARN for a -q quiet option - - logging.basicConfig(level=level, filename=filename, format=log_format) - else: - logging.config.fileConfig(config_path) - - observer = PythonLoggingObserver() - observer.start() + def start_listening(self, secure_port, unsecure_port): + if secure_port is not None: + reactor.listenSSL( + secure_port, Site(self.root_resource), self.tls_context_factory + ) + logger.info("Synapse now listening on port %d", secure_port) + if unsecure_port is not None: + reactor.listenTCP( + unsecure_port, Site(self.root_resource) + ) + logger.info("Synapse now listening on port %d", unsecure_port) def run(): @@ -244,78 +224,53 @@ def run(): def setup(): - parser = argparse.ArgumentParser() - parser.add_argument("-p", "--port", dest="port", type=int, default=8080, - help="The port to listen on.") - parser.add_argument("-d", "--database", dest="db", default="homeserver.db", - help="The database name.") - parser.add_argument("-H", "--host", dest="host", default="localhost", - help="The hostname of the server.") - parser.add_argument('-v', '--verbose', dest="verbose", action='count', - help="The verbosity level.") - parser.add_argument('-f', '--log-file', dest="log_file", default=None, - help="File to log to.") - parser.add_argument('--log-config', dest="log_config", default=None, - help="Python logging config") - parser.add_argument('-D', '--daemonize', action='store_true', - default=False, help="Daemonize the home server") - parser.add_argument('--pid-file', dest="pid", help="When running as a " - "daemon, the file to store the pid in", - default="hs.pid") - parser.add_argument("-W", "--webclient", dest="webclient", default=True, - action="store_false", help="Don't host a web client.") - parser.add_argument("--manhole", dest="manhole", type=int, default=None, - help="Turn on the twisted telnet manhole service.") - args = parser.parse_args() - - verbosity = int(args.verbose) if args.verbose else None - - # Because if/when we daemonize we change to root dir. - db_name = os.path.abspath(args.db) - log_file = args.log_file - if log_file: - log_file = os.path.abspath(log_file) - - setup_logging( - verbosity=verbosity, - filename=log_file, - config_path=args.log_config, + config = HomeServerConfig.load_config( + "Synapse Homeserver", + sys.argv[1:], + generate_section="Homeserver" ) - logger.info("Server hostname: %s", args.host) + config.setup_logging() - if re.search(":[0-9]+$", args.host): - domain_with_port = args.host + logger.info("Server hostname: %s", config.server_name) + + if re.search(":[0-9]+$", config.server_name): + domain_with_port = config.server_name else: - domain_with_port = "%s:%s" % (args.host, args.port) + domain_with_port = "%s:%s" % (config.server_name, config.bind_port) + + tls_context_factory = context_factory.ServerContextFactory(config) hs = SynapseHomeServer( - args.host, + config.server_name, domain_with_port=domain_with_port, upload_dir=os.path.abspath("uploads"), - db_name=db_name, + db_name=config.database_path, + tls_context_factory=tls_context_factory, ) hs.register_servlets() hs.create_resource_tree( - web_client=args.webclient, - redirect_root_to_web_client=True) - hs.start_listening(args.port) + web_client=config.webclient, + redirect_root_to_web_client=True, + ) + hs.start_listening(config.bind_port, config.unsecure_port) hs.get_db_pool() - if args.manhole: + if config.manhole: f = twisted.manhole.telnet.ShellFactory() f.username = "matrix" f.password = "rabbithole" f.namespace['hs'] = hs - reactor.listenTCP(args.manhole, f, interface='127.0.0.1') + reactor.listenTCP(config.manhole, f, interface='127.0.0.1') - if args.daemonize: + if config.daemonize: + print config.pid_file daemon = Daemonize( app="synapse-homeserver", - pid=args.pid, + pid=config.pid_file, action=run, auto_close_fds=False, verbose=True, diff --git a/synapse/config/__init__.py b/synapse/config/__init__.py new file mode 100644 index 0000000000..fe8a073cd3 --- /dev/null +++ b/synapse/config/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/config/_base.py b/synapse/config/_base.py new file mode 100644 index 0000000000..1913179c3a --- /dev/null +++ b/synapse/config/_base.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import ConfigParser as configparser +import argparse +import sys +import os +import yaml + + +class ConfigError(Exception): + pass + + +class Config(object): + def __init__(self, args): + pass + + @staticmethod + def abspath(file_path): + return os.path.abspath(file_path) if file_path else file_path + + @classmethod + def check_file(cls, file_path, config_name): + if file_path is None: + raise ConfigError( + "Missing config for %s." + " Try running again with --generate-config" + % (config_name,) + ) + if not os.path.exists(file_path): + raise ConfigError( + "File % config for %s doesn't exist." + " Try running again with --generate-config" + % (config_name,) + ) + return cls.abspath(file_path) + + @classmethod + def read_file(cls, file_path, config_name): + cls.check_file(file_path, config_name) + with open(file_path) as file_stream: + return file_stream.read() + + @staticmethod + def read_config_file(file_path): + with open(file_path) as file_stream: + return yaml.load(file_stream) + + @classmethod + def add_arguments(cls, parser): + pass + + @classmethod + def generate_config(cls, args, config_dir_path): + pass + + @classmethod + def load_config(cls, description, argv, generate_section=None): + config_parser = argparse.ArgumentParser(add_help=False) + config_parser.add_argument( + "-c", "--config-path", + metavar="CONFIG_FILE", + help="Specify config file" + ) + config_parser.add_argument( + "--generate-config", + action="store_true", + help="Generate config file" + ) + config_args, remaining_args = config_parser.parse_known_args(argv) + + if config_args.generate_config: + if not config_args.config_path: + config_parser.error( + "Must specify where to generate the config file" + ) + config_dir_path = os.path.dirname(config_args.config_path) + if os.path.exists(config_args.config_path): + defaults = cls.read_config_file(config_args.config_path) + else: + defaults = {} + else: + if config_args.config_path: + defaults = cls.read_config_file(config_args.config_path) + else: + defaults = {} + + parser = argparse.ArgumentParser( + parents=[config_parser], + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + cls.add_arguments(parser) + parser.set_defaults(**defaults) + + args = parser.parse_args(remaining_args) + + if config_args.generate_config: + config_dir_path = os.path.dirname(config_args.config_path) + config_dir_path = os.path.abspath(config_dir_path) + if not os.path.exists(config_dir_path): + os.makedirs(config_dir_path) + cls.generate_config(args, config_dir_path) + config = {} + for key, value in vars(args).items(): + if (key not in set(["config_path", "generate_config"]) + and value is not None): + config[key] = value + with open(config_args.config_path, "w") as config_file: + yaml.dump(config, config_file, default_flow_style=False) + sys.exit(0) + + return cls(args) + + + diff --git a/synapse/config/database.py b/synapse/config/database.py new file mode 100644 index 0000000000..edf2361914 --- /dev/null +++ b/synapse/config/database.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config +import os + +class DatabaseConfig(Config): + def __init__(self, args): + super(DatabaseConfig, self).__init__(args) + self.database_path = self.abspath(args.database_path) + + @classmethod + def add_arguments(cls, parser): + super(DatabaseConfig, cls).add_arguments(parser) + db_group = parser.add_argument_group("database") + db_group.add_argument( + "-d", "--database-path", default="homeserver.db", + help="The database name." + ) + + @classmethod + def generate_config(cls, args, config_dir_path): + super(DatabaseConfig, cls).generate_config(args, config_dir_path) + args.database_path = os.path.abspath(args.database_path) + diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py new file mode 100644 index 0000000000..18072e3196 --- /dev/null +++ b/synapse/config/homeserver.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .tls import TlsConfig +from .server import ServerConfig +from .logger import LoggingConfig +from .database import DatabaseConfig + +class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig): + pass + +if __name__=='__main__': + import sys + HomeServerConfig.load_config("Generate config", sys.argv[1:], "HomeServer") diff --git a/synapse/config/logger.py b/synapse/config/logger.py new file mode 100644 index 0000000000..8db6621ae8 --- /dev/null +++ b/synapse/config/logger.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + +from twisted.python.log import PythonLoggingObserver +import logging +import logging.config + +class LoggingConfig(Config): + def __init__(self, args): + super(LoggingConfig, self).__init__(args) + self.verbosity = int(args.verbose) if args.verbose else None + self.log_config = self.abspath(args.log_config) + self.log_file = self.abspath(args.log_file) + + @classmethod + def add_arguments(cls, parser): + super(LoggingConfig, cls).add_arguments(parser) + logging_group = parser.add_argument_group("logging") + logging_group.add_argument( + '-v', '--verbose', dest="verbose", action='count', + help="The verbosity level." + ) + logging_group.add_argument( + '-f', '--log-file', dest="log_file", default=None, + help="File to log to." + ) + logging_group.add_argument( + '--log-config', dest="log_config", default=None, + help="Python logging config file" + ) + + def setup_logging(self): + log_format = ( + '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s' + ) + if self.log_config is None: + + level = logging.INFO + if self.verbosity: + level = logging.DEBUG + + # FIXME: we need a logging.WARN for a -q quiet option + + logging.basicConfig( + level=level, + filename=self.log_file, + format=log_format + ) + else: + logging.config.fileConfig(self.log_config) + + observer = PythonLoggingObserver() + observer.start() diff --git a/synapse/config/server.py b/synapse/config/server.py new file mode 100644 index 0000000000..36143e3c9c --- /dev/null +++ b/synapse/config/server.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import nacl.signing +import os +from ._base import Config +from syutil.base64util import encode_base64, decode_base64 + + +class ServerConfig(Config): + def __init__(self, args): + super(ServerConfig, self).__init__(args) + self.server_name = args.server_name + self.signing_key = self.read_signing_key(args.signing_key_path) + self.bind_port = args.bind_port + self.bind_host = args.bind_host + self.unsecure_port = args.unsecure_port + self.daemonize = args.daemonize + self.pid_file = self.abspath(args.pid_file) + self.webclient = True + self.manhole = args.manhole + + @classmethod + def add_arguments(cls, parser): + super(ServerConfig, cls).add_arguments(parser) + server_group = parser.add_argument_group("server") + server_group.add_argument("-H", "--server-name", default="localhost", + help="The name of the server") + server_group.add_argument("--signing-key-path", + help="The signing key to sign messages with") + server_group.add_argument("-p", "--bind-port", metavar="PORT", + type=int, help="https port to listen on", + default=8448) + server_group.add_argument("--unsecure-port", metavar="PORT", + type=int, help="http port to listen on", + default=8008) + server_group.add_argument("--bind-host", default="", + help="Local interface to listen on") + server_group.add_argument("-D", "--daemonize", action='store_true', + help="Daemonize the home server") + server_group.add_argument('--pid-file', default="hs.pid", + help="When running as a daemon, the file to" + " store the pid in") + server_group.add_argument("--manhole", metavar="PORT", dest="manhole", + type=int, + help="Turn on the twisted telnet manhole" + " service on the given port.") + + def read_signing_key(self, signing_key_path): + signing_key_base64 = self.read_file(signing_key_path, "signing_key") + signing_key_bytes = decode_base64(signing_key_base64) + return nacl.signing.SigningKey(signing_key_bytes) + + @classmethod + def generate_config(cls, args, config_dir_path): + super(ServerConfig, cls).generate_config(args, config_dir_path) + base_key_name = os.path.join(config_dir_path, args.server_name) + + args.pid_file = os.path.abspath(args.pid_file) + + if not args.signing_key_path: + args.signing_key_path = base_key_name + ".signing.key" + + if not os.path.exists(args.signing_key_path): + with open(args.signing_key_path, "w") as signing_key_file: + key = nacl.signing.SigningKey.generate() + signing_key_file.write(encode_base64(key.encode())) diff --git a/synapse/config/tls.py b/synapse/config/tls.py new file mode 100644 index 0000000000..16f6f3aba6 --- /dev/null +++ b/synapse/config/tls.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + +from OpenSSL import crypto +import subprocess +import os + +GENERATE_DH_PARAMS=False + + +class TlsConfig(Config): + def __init__(self, args): + super(TlsConfig, self).__init__(args) + self.tls_certificate = self.read_tls_certificate( + args.tls_certificate_path + ) + self.tls_private_key = self.read_tls_private_key( + args.tls_private_key_path + ) + self.tls_dh_params_path = self.check_file( + args.tls_dh_params_path, "tls_dh_params" + ) + + @classmethod + def add_arguments(cls, parser): + super(TlsConfig, cls).add_arguments(parser) + tls_group = parser.add_argument_group("tls") + tls_group.add_argument("--tls-certificate-path", + help="PEM encoded X509 certificate for TLS") + tls_group.add_argument("--tls-private-key-path", + help="PEM encoded private key for TLS") + tls_group.add_argument("--tls-dh-params-path", + help="PEM dh parameters for ephemeral keys") + + def read_tls_certificate(self, cert_path): + cert_pem = self.read_file(cert_path, "tls_certificate") + return crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + + def read_tls_private_key(self, private_key_path): + private_key_pem = self.read_file(private_key_path, "tls_private_key") + return crypto.load_privatekey(crypto.FILETYPE_PEM, private_key_pem) + + @classmethod + def generate_config(cls, args, config_dir_path): + super(TlsConfig, cls).generate_config(args, config_dir_path) + base_key_name = os.path.join(config_dir_path, args.server_name) + + if args.tls_certificate_path is None: + args.tls_certificate_path = base_key_name + ".tls.crt" + + if args.tls_private_key_path is None: + args.tls_private_key_path = base_key_name + ".tls.key" + + if args.tls_dh_params_path is None: + args.tls_dh_params_path = base_key_name + ".tls.dh" + + if not os.path.exists(args.tls_private_key_path): + with open(args.tls_private_key_path, "w") as private_key_file: + tls_private_key = crypto.PKey() + tls_private_key.generate_key(crypto.TYPE_RSA, 2048) + private_key_pem = crypto.dump_privatekey( + crypto.FILETYPE_PEM, tls_private_key + ) + private_key_file.write(private_key_pem) + else: + with open(args.tls_private_key_path) as private_key_file: + private_key_pem = private_key_file.read() + tls_private_key = crypto.load_privatekey( + crypto.FILETYPE_PEM, private_key_pem + ) + + if not os.path.exists(args.tls_certificate_path): + with open(args.tls_certificate_path, "w") as certifcate_file: + cert = crypto.X509() + subject = cert.get_subject() + subject.CN = args.server_name + + cert.set_serial_number(1000) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(tls_private_key) + + cert.sign(tls_private_key, 'sha256') + + cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) + + certifcate_file.write(cert_pem) + + if not os.path.exists(args.tls_dh_params_path): + if GENERATE_DH_PARAMS: + subprocess.check_call([ + "openssl", "dhparam", + "-outform", "PEM", + "-out", args.tls_dh_params_path, + "2048" + ]) + else: + with open(args.tls_dh_params_path, "w") as dh_params_file: + dh_params_file.write( + "2048-bit DH parameters taken from rfc3526\n" + "-----BEGIN DH PARAMETERS-----\n" + "MIIBCAKCAQEA///////////JD9qiIWjC" + "NMTGYouA3BzRKQJOCIpnzHQCC76mOxOb\n" + "IlFKCHmONATd75UZs806QxswKwpt8l8U" + "N0/hNW1tUcJF5IW1dmJefsb0TELppjft\n" + "awv/XLb0Brft7jhr+1qJn6WunyQRfEsf" + "5kkoZlHs5Fs9wgB8uKFjvwWY2kg2HFXT\n" + "mmkWP6j9JM9fg2VdI9yjrZYcYvNWIIVS" + "u57VKQdwlpZtZww1Tkq8mATxdGwIyhgh\n" + "fDKQXkYuNs474553LBgOhgObJ4Oi7Aei" + "j7XFXfBvTFLJ3ivL9pVYFxg5lUl86pVq\n" + "5RXSJhiY+gUQFXKOWoqsqmj/////////" + "/wIBAg==\n" + "-----END DH PARAMETERS-----\n" + ) diff --git a/synapse/crypto/config.py b/synapse/crypto/config.py deleted file mode 100644 index 2330133e71..0000000000 --- a/synapse/crypto/config.py +++ /dev/null @@ -1,160 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import ConfigParser as configparser -import argparse -import socket -import sys -import os -from OpenSSL import crypto -import nacl.signing -from syutil.base64util import encode_base64 -import subprocess - - -def load_config(description, argv): - config_parser = argparse.ArgumentParser(add_help=False) - config_parser.add_argument("-c", "--config-path", metavar="CONFIG_FILE", - help="Specify config file") - config_args, remaining_args = config_parser.parse_known_args(argv) - if config_args.config_path: - config = configparser.SafeConfigParser() - config.read([config_args.config_path]) - defaults = dict(config.items("KeyServer")) - else: - defaults = {} - parser = argparse.ArgumentParser( - parents=[config_parser], - description=description, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.set_defaults(**defaults) - parser.add_argument("--server-name", default=socket.getfqdn(), - help="The name of the server") - parser.add_argument("--signing-key-path", - help="The signing key to sign responses with") - parser.add_argument("--tls-certificate-path", - help="PEM encoded X509 certificate for TLS") - parser.add_argument("--tls-private-key-path", - help="PEM encoded private key for TLS") - parser.add_argument("--tls-dh-params-path", - help="PEM encoded dh parameters for ephemeral keys") - parser.add_argument("--bind-port", type=int, - help="TCP port to listen on") - parser.add_argument("--bind-host", default="", - help="Local interface to listen on") - - args = parser.parse_args(remaining_args) - - server_config = vars(args) - del server_config["config_path"] - return server_config - - -def generate_config(argv): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config-path", help="Specify config file", - metavar="CONFIG_FILE", required=True) - parser.add_argument("--server-name", default=socket.getfqdn(), - help="The name of the server") - parser.add_argument("--signing-key-path", - help="The signing key to sign responses with") - parser.add_argument("--tls-certificate-path", - help="PEM encoded X509 certificate for TLS") - parser.add_argument("--tls-private-key-path", - help="PEM encoded private key for TLS") - parser.add_argument("--tls-dh-params-path", - help="PEM encoded dh parameters for ephemeral keys") - parser.add_argument("--bind-port", type=int, required=True, - help="TCP port to listen on") - parser.add_argument("--bind-host", default="", - help="Local interface to listen on") - - args = parser.parse_args(argv) - - dir_name = os.path.dirname(args.config_path) - base_key_name = os.path.join(dir_name, args.server_name) - - if args.signing_key_path is None: - args.signing_key_path = base_key_name + ".signing.key" - - if args.tls_certificate_path is None: - args.tls_certificate_path = base_key_name + ".tls.crt" - - if args.tls_private_key_path is None: - args.tls_private_key_path = base_key_name + ".tls.key" - - if args.tls_dh_params_path is None: - args.tls_dh_params_path = base_key_name + ".tls.dh" - - if not os.path.exists(args.signing_key_path): - with open(args.signing_key_path, "w") as signing_key_file: - key = nacl.signing.SigningKey.generate() - signing_key_file.write(encode_base64(key.encode())) - - if not os.path.exists(args.tls_private_key_path): - with open(args.tls_private_key_path, "w") as private_key_file: - tls_private_key = crypto.PKey() - tls_private_key.generate_key(crypto.TYPE_RSA, 2048) - private_key_pem = crypto.dump_privatekey( - crypto.FILETYPE_PEM, tls_private_key - ) - private_key_file.write(private_key_pem) - else: - with open(args.tls_private_key_path) as private_key_file: - private_key_pem = private_key_file.read() - tls_private_key = crypto.load_privatekey( - crypto.FILETYPE_PEM, private_key_pem - ) - - if not os.path.exists(args.tls_certificate_path): - with open(args.tls_certificate_path, "w") as certifcate_file: - cert = crypto.X509() - subject = cert.get_subject() - subject.CN = args.server_name - - cert.set_serial_number(1000) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) - cert.set_issuer(cert.get_subject()) - cert.set_pubkey(tls_private_key) - - cert.sign(tls_private_key, 'sha256') - - cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) - - certifcate_file.write(cert_pem) - - if not os.path.exists(args.tls_dh_params_path): - subprocess.check_call([ - "openssl", "dhparam", - "-outform", "PEM", - "-out", args.tls_dh_params_path, - "2048" - ]) - - config = configparser.SafeConfigParser() - config.add_section("KeyServer") - for key, value in vars(args).items(): - if key != "config_path": - config.set("KeyServer", key, str(value)) - - with open(args.config_path, "w") as config_file: - config.write(config_file) - - -if __name__ == "__main__": - generate_config(sys.argv[1:]) diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py new file mode 100644 index 0000000000..45958abbf5 --- /dev/null +++ b/synapse/crypto/context_factory.py @@ -0,0 +1,29 @@ +from twisted.internet import reactor, ssl +from OpenSSL import SSL +from twisted.internet._sslverify import _OpenSSLECCurve, _defaultCurveName + + +class ServerContextFactory(ssl.ContextFactory): + """Factory for PyOpenSSL SSL contexts that are used to handle incoming + connections and to make connections to remote servers.""" + + def __init__(self, config): + self._context = SSL.Context(SSL.SSLv23_METHOD) + self.configure_context(self._context, config) + + @staticmethod + def configure_context(context, config): + try: + _ecCurve = _OpenSSLECCurve(_defaultCurveName) + _ecCurve.addECKeyToContext(context) + except: + pass + context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) + context.use_certificate(config.tls_certificate) + context.use_privatekey(config.tls_private_key) + context.load_tmp_dh(config.tls_dh_params_path) + context.set_cipher_list("!ADH:HIGH+kEDH:!AECDH:HIGH+kEECDH") + + def getContext(self): + return self._context + diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 2b2f11f36a..b468f70546 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -68,6 +68,7 @@ class Pdu(JsonEncodedObject): "power_level", "prev_state_id", "prev_state_origin", + "required_power_level", ] internal_keys = [ diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3d7f97bcff..4aeb2089f5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -76,6 +76,10 @@ class MessageHandler(BaseRoomHandler): Raises: SynapseError if something went wrong. """ + # TODO(paul): Why does 'event' not have a 'user' object? + user = self.hs.parse_userid(event.user_id) + assert user.is_mine, "User must be our own: %s" % (user,) + if stamp_event: event.content["hsob_ts"] = int(self.clock.time_msec()) @@ -86,6 +90,10 @@ class MessageHandler(BaseRoomHandler): yield self._on_new_room_event(event, snapshot) + self.hs.get_handlers().presence_handler.bump_presence_active_time( + user + ) + @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, feedback=False): @@ -274,11 +282,11 @@ class MessageHandler(BaseRoomHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.events_key, + end_token=now_token.room_key, ) - start_token = now_token.copy_and_replace("events_key", token[0]) - end_token = now_token.copy_and_replace("events_key", token[1]) + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) d["messages"] = { "chunk": [m.get_dict() for m in messages], diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7731de85c0..9bfceda88a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,13 @@ def partitionbool(l, func): class PresenceHandler(BaseHandler): + STATE_LEVELS = { + PresenceState.OFFLINE: 0, + PresenceState.UNAVAILABLE: 1, + PresenceState.ONLINE: 2, + PresenceState.FREE_FOR_CHAT: 3, + } + def __init__(self, hs): super(PresenceHandler, self).__init__(hs) @@ -135,7 +142,7 @@ class PresenceHandler(BaseHandler): return self._user_cachemap[user] else: statuscache = UserPresenceCache() - statuscache.update({"state": PresenceState.OFFLINE}, user) + statuscache.update({"presence": PresenceState.OFFLINE}, user) return statuscache def registered_user(self, user): @@ -143,10 +150,6 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): - defer.returnValue(True) - # return - # FIXME (erikj): This code path absolutely kills the database. - assert(observed_user.is_mine) if observer_user == observed_user: @@ -173,19 +176,24 @@ class PresenceHandler(BaseHandler): observed_user=target_user ) - if visible: - state = yield self.store.get_presence_state( - target_user.localpart - ) - else: + if not visible: raise SynapseError(404, "Presence information not visible") + state = yield self.store.get_presence_state(target_user.localpart) + if "mtime" in state: + del state["mtime"] + state["presence"] = state["state"] + + if target_user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[target_user].get_state()["last_active"] + ) else: # TODO(paul): Have remote server send us permissions set state = self._get_or_offline_usercache(target_user).get_state() - if "mtime" in state and (state["mtime"] is not None): - state["mtime_age"] = int( - self.clock.time_msec() - state.pop("mtime") + if "last_active" in state: + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") ) defer.returnValue(state) @@ -202,20 +210,33 @@ class PresenceHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") - # TODO(paul): Sanity-check 'state' if "status_msg" not in state: state["status_msg"] = None for k in state.keys(): - if k not in ("state", "status_msg"): + if k not in ("presence", "state", "status_msg"): raise SynapseError( 400, "Unexpected presence state key '%s'" % (k,) ) + # Handle legacy "state" key for now + if "state" in state: + state["presence"] = state.pop("state") + + if state["presence"] not in self.STATE_LEVELS: + raise SynapseError(400, "'%s' is not a valid presence state" % + state["presence"] + ) + logger.debug("Updating presence state of %s to %s", - target_user.localpart, state["state"]) + target_user.localpart, state["presence"]) state_to_store = dict(state) + state_to_store["state"] = state_to_store.pop("presence") + + statuscache=self._get_or_offline_usercache(target_user) + was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]] + now_level = self.STATE_LEVELS[state["presence"]] yield defer.DeferredList([ self.store.set_presence_state( @@ -226,9 +247,10 @@ class PresenceHandler(BaseHandler): ), ]) - state["mtime"] = self.clock.time_msec() + if now_level > was_level: + state["last_active"] = self.clock.time_msec() - now_online = state["state"] != PresenceState.OFFLINE + now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap if now_online and not was_polling: @@ -240,6 +262,12 @@ class PresenceHandler(BaseHandler): # we don't have to do this all the time self.changed_presencelike_data(target_user, state) + def bump_presence_active_time(self, user, now=None): + if now is None: + now = self.clock.time_msec() + + self.changed_presencelike_data(user, {"last_active": now}) + def changed_presencelike_data(self, user, state): statuscache = self._get_or_make_usercache(user) @@ -251,28 +279,27 @@ class PresenceHandler(BaseHandler): @log_function def started_user_eventstream(self, user): # TODO(paul): Use "last online" state - self.set_state(user, user, {"state": PresenceState.ONLINE}) + self.set_state(user, user, {"presence": PresenceState.ONLINE}) @log_function def stopped_user_eventstream(self, user): # TODO(paul): Save current state as "last online" state - self.set_state(user, user, {"state": PresenceState.OFFLINE}) + self.set_state(user, user, {"presence": PresenceState.OFFLINE}) @defer.inlineCallbacks def user_joined_room(self, user, room_id): - if user.is_mine: - self.push_update_to_local_and_remote( - observed_user=user, - room_ids=[room_id], - statuscache=self._get_or_offline_usercache(user), - ) + statuscache = self._get_or_make_usercache(user) - else: - self.push_update_to_clients( + # No actual update but we need to bump the serial anyway for the + # event source + self._user_cachemap_latest_serial += 1 + statuscache.update({}, serial=self._user_cachemap_latest_serial) + + self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], - statuscache=self._get_or_offline_usercache(user), + statuscache=statuscache, ) # We also want to tell them about current presence of people. @@ -386,9 +413,9 @@ class PresenceHandler(BaseHandler): observed_user = self.hs.parse_userid(p.pop("observed_user_id")) p["observed_user"] = observed_user p.update(self._get_or_offline_usercache(observed_user).get_state()) - if "mtime" in p: - p["mtime_age"] = int( - self.clock.time_msec() - p.pop("mtime") + if "last_active" in p: + p["last_active_ago"] = int( + self.clock.time_msec() - p.pop("last_active") ) defer.returnValue(presence) @@ -457,10 +484,6 @@ class PresenceHandler(BaseHandler): def _start_polling_local(self, user, target_user): target_localpart = target_user.localpart - if not self.is_presence_visible(observer_user=user, - observed_user=target_user): - return - if target_localpart not in self._local_pushmap: self._local_pushmap[target_localpart] = set() @@ -577,21 +600,30 @@ class PresenceHandler(BaseHandler): def _push_presence_remote(self, user, destination, state=None): if state is None: state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state["state"] + + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) yield self.distributor.fire( "collect_presencelike_data", user, state ) - if "mtime" in state: + if "last_active" in state: state = dict(state) - state["mtime_age"] = int( - self.clock.time_msec() - state.pop("mtime") + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") ) user_state = { "user_id": user.to_string(), } user_state.update(**state) + if "state" in user_state and "presence" not in user_state: + user_state["presence"] = user_state["state"] yield self.federation.send_edu( destination=destination, @@ -618,14 +650,29 @@ class PresenceHandler(BaseHandler): room_ids = yield rm_handler.get_rooms_for_user(user) if not observers and not room_ids: - break + continue state = dict(push) del state["user_id"] - if "mtime_age" in state: - state["mtime"] = int( - self.clock.time_msec() - state.pop("mtime_age") + if "presence" in state: + # all is OK + pass + elif "state" in state: + # Legacy handling + state["presence"] = state["state"] + else: + logger.warning("Received a presence 'push' EDU from %s without" + + " either a 'presence' or 'state' key", origin + ) + continue + + if "state" in state: + del state["state"] + + if "last_active_ago" in state: + state["last_active"] = int( + self.clock.time_msec() - state.pop("last_active_ago") ) statuscache = self._get_or_make_usercache(user) @@ -640,7 +687,7 @@ class PresenceHandler(BaseHandler): statuscache=statuscache, ) - if state["state"] == PresenceState.OFFLINE: + if state["presence"] == PresenceState.OFFLINE: del self._user_cachemap[user] for poll in content.get("poll", []): @@ -673,10 +720,9 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds) @defer.inlineCallbacks - def push_update_to_local_and_remote(self, observed_user, + def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], - remote_domains=[], - statuscache=None): + remote_domains=[]): localusers, remoteusers = partitionbool( users_to_push, @@ -722,6 +768,78 @@ class PresenceHandler(BaseHandler): ) +class PresenceEventSource(object): + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if from_key < cachemap[k].serial] + + if updates: + clock = self.clock + + latest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + return ((data, latest_serial)) + else: + return (([], presence._user_cachemap_latest_serial)) + + def get_current_key(self): + presence = self.hs.get_handlers().presence_handler + return presence._user_cachemap_latest_serial + + def get_pagination_rows(self, user, pagination_config, key): + # TODO (erikj): Does this make sense? Ordering? + + from_token = pagination_config.from_token + to_token = pagination_config.to_token + + from_key = int(from_token.presence_key) + + if to_token: + to_key = int(to_token.presence_key) + else: + to_key = -1 + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if to_key < cachemap[k].serial < from_key] + + if updates: + clock = self.clock + + earliest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + if to_token: + next_token = to_token + else: + next_token = from_token + + next_token = next_token.copy_and_replace( + "presence_key", earliest_serial + ) + return ((data, next_token)) + else: + if not to_token: + to_token = from_token.copy_and_replace( + "presence_key", 0 + ) + return (([], to_token)) + + class UserPresenceCache(object): """Store an observed user's state and status message. @@ -733,6 +851,7 @@ class UserPresenceCache(object): def update(self, state, serial): assert("mtime_age" not in state) + assert("state" not in state) self.state.update(state) # Delete keys that are now 'None' @@ -749,15 +868,21 @@ class UserPresenceCache(object): def get_state(self): # clone it so caller can't break our cache - return dict(self.state) + state = dict(self.state) + + # Legacy handling + if "presence" in state: + state["state"] = state["presence"] + + return state def make_event(self, user, clock): content = self.get_state() content["user_id"] = user.to_string() - if "mtime" in content: - content["mtime_age"] = int( - clock.time_msec() - content.pop("mtime") + if "last_active" in content: + content["last_active_ago"] = int( + clock.time_msec() - content.pop("last_active") ) return {"type": "m.presence", "content": content} diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3e41d7a46b..53aa77405c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -17,10 +17,12 @@ from twisted.internet import defer from synapse.types import UserID, RoomAlias, RoomID -from synapse.api.constants import Membership +from synapse.api.constants import Membership, JoinRules from synapse.api.errors import StoreError, SynapseError from synapse.api.events.room import ( - RoomMemberEvent, RoomConfigEvent + RoomMemberEvent, RoomCreateEvent, RoomPowerLevelsEvent, + RoomJoinRulesEvent, RoomAddStateLevelEvent, RoomTopicEvent, + RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent, ) from synapse.util import stringutils from ._base import BaseRoomHandler @@ -62,6 +64,8 @@ class RoomCreationHandler(BaseRoomHandler): else: room_alias = None + is_public = config.get("visibility", None) == "public" + if room_id: # Ensure room_id is the correct type room_id_obj = RoomID.from_string(room_id, self.hs) @@ -71,7 +75,7 @@ class RoomCreationHandler(BaseRoomHandler): yield self.store.store_room( room_id=room_id, room_creator_user_id=user_id, - is_public=config["visibility"] == "public" + is_public=is_public ) else: # autogen room IDs and try to create it. We may clash, so just @@ -85,7 +89,7 @@ class RoomCreationHandler(BaseRoomHandler): yield self.store.store_room( room_id=gen_room_id.to_string(), room_creator_user_id=user_id, - is_public=config["visibility"] == "public" + is_public=is_public ) room_id = gen_room_id.to_string() break @@ -94,18 +98,9 @@ class RoomCreationHandler(BaseRoomHandler): if not room_id: raise StoreError(500, "Couldn't generate a room ID.") - config_event = self.event_factory.create_event( - etype=RoomConfigEvent.TYPE, - room_id=room_id, - user_id=user_id, - content=config, - ) - - snapshot = yield self.store.snapshot_room( - room_id=room_id, - user_id=user_id, - state_type=RoomConfigEvent.TYPE, - state_key="", + user = self.hs.parse_userid(user_id) + creation_events = self._create_events_for_new_room( + user, room_id, is_public=is_public ) if room_alias: @@ -115,11 +110,46 @@ class RoomCreationHandler(BaseRoomHandler): servers=[self.hs.hostname], ) - yield self.state_handler.handle_new_event(config_event, snapshot) - # store_id = persist... - federation_handler = self.hs.get_handlers().federation_handler - yield federation_handler.handle_new_event(config_event, snapshot) + + @defer.inlineCallbacks + def handle_event(event): + snapshot = yield self.store.snapshot_room( + room_id=room_id, + user_id=user_id, + ) + + logger.debug("Event: %s", event) + + yield self.state_handler.handle_new_event(event, snapshot) + yield self._on_new_room_event(event, snapshot, extra_users=[user]) + + for event in creation_events: + yield handle_event(event) + + if "name" in config: + name = config["name"] + name_event = self.event_factory.create_event( + etype=RoomNameEvent.TYPE, + room_id=room_id, + user_id=user_id, + required_power_level=5, + content={"name": name}, + ) + + yield handle_event(name_event) + + if "topic" in config: + topic = config["topic"] + topic_event = self.event_factory.create_event( + etype=RoomTopicEvent.TYPE, + room_id=room_id, + user_id=user_id, + required_power_level=5, + content={"topic": topic}, + ) + + yield handle_event(topic_event) content = {"membership": Membership.JOIN} join_event = self.event_factory.create_event( @@ -142,6 +172,62 @@ class RoomCreationHandler(BaseRoomHandler): defer.returnValue(result) + def _create_events_for_new_room(self, creator, room_id, is_public=False): + event_keys = { + "room_id": room_id, + "user_id": creator.to_string(), + "required_power_level": 10, + } + + def create(etype, **content): + return self.event_factory.create_event( + etype=etype, + content=content, + **event_keys + ) + + creation_event = create( + etype=RoomCreateEvent.TYPE, + creator=creator.to_string(), + ) + + power_levels_event = self.event_factory.create_event( + etype=RoomPowerLevelsEvent.TYPE, + content={creator.to_string(): 10, "default": 0}, + **event_keys + ) + + join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE + join_rules_event = create( + etype=RoomJoinRulesEvent.TYPE, + join_rule=join_rule, + ) + + add_state_event = create( + etype=RoomAddStateLevelEvent.TYPE, + level=10, + ) + + send_event = create( + etype=RoomSendEventLevelEvent.TYPE, + level=0, + ) + + ops = create( + etype=RoomOpsPowerLevelsEvent.TYPE, + ban_level=5, + kick_level=5, + ) + + return [ + creation_event, + power_levels_event, + join_rules_event, + add_state_event, + send_event, + ops, + ] + class RoomMemberHandler(BaseRoomHandler): # TODO(paul): This handler currently contains a messy conflation of @@ -285,6 +371,16 @@ class RoomMemberHandler(BaseRoomHandler): if do_auth: yield self.auth.check(event, snapshot, raises=True) + # If we're banning someone, set a req power level + if event.membership == Membership.BAN: + if not hasattr(event, "required_power_level") or event.required_power_level is None: + # Add some default required_power_level + user_level = yield self.store.get_power_level( + event.room_id, + event.user_id, + ) + event.required_power_level = user_level + if prev_state and prev_state.membership == event.membership: # double same action, treat this event as a NOOP. defer.returnValue({}) @@ -445,10 +541,9 @@ class RoomMemberHandler(BaseRoomHandler): host = target_user.domain destinations.append(host) - # If we are joining a remote HS, include that. - if membership == Membership.JOIN: - host = target_user.domain - destinations.append(host) + # Always include target domain + host = target_user.domain + destinations.append(host) return self._on_new_room_event( event, snapshot, extra_destinations=destinations, @@ -462,3 +557,49 @@ class RoomListHandler(BaseRoomHandler): chunk = yield self.store.get_rooms(is_public=True) # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) + + +class RoomEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + # We just ignore the key for now. + + to_key = yield self.get_current_key() + + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_key, + to_key=to_key, + room_id=None, + limit=limit, + ) + + defer.returnValue((events, end_key)) + + def get_current_key(self): + return self.store.get_room_events_max_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, pagination_config, key): + from_token = pagination_config.from_token + to_token = pagination_config.to_token + limit = pagination_config.limit + direction = pagination_config.direction + + to_key = to_token.room_key if to_token else None + + events, next_key = yield self.store.paginate_room_events( + room_id=key, + from_key=from_token.room_key, + to_key=to_key, + direction=direction, + limit=limit, + with_feedback=True + ) + + next_token = from_token.copy_and_replace("room_key", next_key) + + defer.returnValue((events, next_token)) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 9fab0ff37c..3268427ecd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -145,3 +145,17 @@ class TypingNotificationHandler(BaseHandler): typing): # TODO(paul) steal this from presence.py pass + + +class TypingNotificationEventSource(object): + def __init__(self, hs): + self.hs = hs + + def get_new_events_for_user(self, user, from_key, limit): + return ([], from_key) + + def get_current_key(self): + return 0 + + def get_pagination_rows(self, user, pagination_config, key): + return ([], pagination_config.from_token) diff --git a/synapse/http/client.py b/synapse/http/client.py index 36ba2c6591..093bdf0e3f 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -113,8 +113,9 @@ class TwistedHttpClient(HttpClient): requests. """ - def __init__(self): + def __init__(self, hs): self.agent = MatrixHttpAgent(reactor) + self.hs = hs @defer.inlineCallbacks def put_json(self, destination, path, data): @@ -177,7 +178,10 @@ class TwistedHttpClient(HttpClient): retries_left = 5 # TODO: setup and pass in an ssl_context to enable TLS - endpoint = matrix_endpoint(reactor, destination, timeout=10) + endpoint = matrix_endpoint( + reactor, destination, timeout=10, + ssl_context_factory=self.hs.tls_context_factory + ) while True: try: diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index d91500b07d..a6ebe23567 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -53,7 +53,7 @@ def matrix_endpoint(reactor, destination, ssl_context_factory=None, default_port = 8080 else: transport_endpoint = SSL4ClientEndpoint - endpoint_kw_args.update(ssl_context_factory=ssl_context_factory) + endpoint_kw_args.update(sslContextFactory=ssl_context_factory) default_port = 443 if port is None: diff --git a/synapse/http/server.py b/synapse/http/server.py index 66f966fcaa..0b87718bfa 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -325,7 +325,11 @@ class ContentRepoResource(resource.Resource): # FIXME (erikj): These should use constants. file_name = os.path.basename(fname) - url = "http://%s/matrix/content/%s" % ( + # FIXME: we can't assume what the public mounted path of the repo is + # ...plus self-signed SSL won't work to remote clients anyway + # ...and we can't assume that it's SSL anyway, as we might want to + # server it via the non-SSL listener... + url = "https://%s/_matrix/content/%s" % ( self.hs.domain_with_port, file_name ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 253f60983b..3260aa744f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -95,7 +95,7 @@ class Notifier(object): """ room_id = event.room_id - source = self.event_sources.sources["room"] + room_source = self.event_sources.sources["room"] listeners = self.rooms_to_listeners.get(room_id, set()).copy() @@ -109,13 +109,17 @@ class Notifier(object): @defer.inlineCallbacks def notify(listener): - events, end_token = yield source.get_new_events_for_user( + events, end_key = yield room_source.get_new_events_for_user( listener.user, - listener.from_token, + listener.from_token.room_key, listener.limit, ) if events: + end_token = listener.from_token.copy_and_replace( + "room_key", end_key + ) + listener.notify( self, events, listener.from_token, end_token ) @@ -135,7 +139,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - source = self.event_sources.sources["presence"] + presence_source = self.event_sources.sources["presence"] listeners = set() @@ -147,13 +151,17 @@ class Notifier(object): @defer.inlineCallbacks def notify(listener): - events, end_token = yield source.get_new_events_for_user( + events, end_key = yield presence_source.get_new_events_for_user( listener.user, - listener.from_token, + listener.from_token.presence_key, listener.limit, ) if events: + end_token = listener.from_token.copy_and_replace( + "presence_key", end_key + ) + listener.notify( self, events, listener.from_token, end_token ) @@ -233,16 +241,18 @@ class Notifier(object): limit = listener.limit # TODO (erikj): DeferredList? - for source in self.event_sources.sources.values(): - stuff, new_token = yield source.get_new_events_for_user( + for name, source in self.event_sources.sources.items(): + keyname = "%s_key" % name + + stuff, new_key = yield source.get_new_events_for_user( listener.user, - from_token, + getattr(from_token, keyname), limit, ) events.extend(stuff) - from_token = new_token + from_token = from_token.copy_and_replace(keyname, new_key) end_token = from_token diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py index e013b20853..bce3943542 100644 --- a/synapse/rest/presence.py +++ b/synapse/rest/presence.py @@ -48,7 +48,11 @@ class PresenceStatusRestServlet(RestServlet): try: content = json.loads(request.content.read()) - state["state"] = content.pop("state") + # Legacy handling + if "state" in content: + state["presence"] = content.pop("state") + else: + state["presence"] = content.pop("presence") if "status_msg" in content: state["status_msg"] = content.pop("status_msg") diff --git a/synapse/rest/profile.py b/synapse/rest/profile.py index 2f010c0182..06076667c7 100644 --- a/synapse/rest/profile.py +++ b/synapse/rest/profile.py @@ -87,6 +87,27 @@ class ProfileAvatarURLRestServlet(RestServlet): return (200, {}) +class ProfileRestServlet(RestServlet): + PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)") + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + user = self.hs.parse_userid(user_id) + + displayname = yield self.handlers.profile_handler.get_displayname( + user, + ) + avatar_url = yield self.handlers.profile_handler.get_avatar_url( + user, + ) + + defer.returnValue((200, { + "displayname": displayname, + "avatar_url": avatar_url + })) + + def register_servlets(hs, http_server): ProfileDisplaynameRestServlet(hs).register(http_server) ProfileAvatarURLRestServlet(hs).register(http_server) + ProfileRestServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e8faba3eeb..aadaab06e7 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -19,6 +19,11 @@ from synapse.api.events.room import ( RoomMemberEvent, RoomTopicEvent, FeedbackEvent, # RoomConfigEvent, RoomNameEvent, + RoomJoinRulesEvent, + RoomPowerLevelsEvent, + RoomAddStateLevelEvent, + RoomSendEventLevelEvent, + RoomOpsPowerLevelsEvent, ) from synapse.util.logutils import log_function @@ -33,6 +38,7 @@ from .roommember import RoomMemberStore from .stream import StreamStore from .pdu import StatePduStore, PduStore from .transactions import TransactionStore +from .keys import KeyStore import json import logging @@ -45,7 +51,7 @@ logger = logging.getLogger(__name__) class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, - DirectoryStore): + DirectoryStore, KeyStore): def __init__(self, hs): super(DataStore, self).__init__(hs) @@ -122,13 +128,21 @@ class DataStore(RoomMemberStore, RoomStore, if event.type == RoomMemberEvent.TYPE: self._store_room_member_txn(txn, event) elif event.type == FeedbackEvent.TYPE: - self._store_feedback_txn(txn,event) -# elif event.type == RoomConfigEvent.TYPE: -# self._store_room_config_txn(txn, event) + self._store_feedback_txn(txn, event) elif event.type == RoomNameEvent.TYPE: self._store_room_name_txn(txn, event) elif event.type == RoomTopicEvent.TYPE: self._store_room_topic_txn(txn, event) + elif event.type == RoomJoinRulesEvent.TYPE: + self._store_join_rule(txn, event) + elif event.type == RoomPowerLevelsEvent.TYPE: + self._store_power_levels(txn, event) + elif event.type == RoomAddStateLevelEvent.TYPE: + self._store_add_state_level(txn, event) + elif event.type == RoomSendEventLevelEvent.TYPE: + self._store_send_event_level(txn, event) + elif event.type == RoomOpsPowerLevelsEvent.TYPE: + self._store_ops_level(txn, event) vals = { "topological_ordering": event.depth, @@ -222,7 +236,6 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(self.min_token) - def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): """Snapshot the room for an update by a user Args: diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py new file mode 100644 index 0000000000..4d19b9f641 --- /dev/null +++ b/synapse/storage/keys.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore + +from twisted.internet import defer + +import OpenSSL +import nacl.signing + +class KeyStore(SQLBaseStore): + """Persistence for signature verification keys and tls X.509 certificates + """ + + @defer.inlineCallbacks + def get_server_certificate(self, server_name): + """Retrieve the TLS X.509 certificate for the given server + Args: + server_name (bytes): The name of the server. + Returns: + (OpenSSL.crypto.X509): The tls certificate. + """ + tls_certificate_bytes, = yield self._simple_select_one( + table="server_tls_certificates", + keyvalues={"server_name": server_name}, + retcols=("tls_certificate",), + ) + tls_certificate = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_ASN1, tls_certificate_bytes, + ) + defer.returnValue(tls_certificate) + + def store_server_certificate(self, server_name, key_server, ts_now_ms, + tls_certificate): + """Stores the TLS X.509 certificate for the given server + Args: + server_name (bytes): The name of the server. + key_server (bytes): Where the certificate was looked up + ts_now_ms (int): The time now in milliseconds + tls_certificate (OpenSSL.crypto.X509): The X.509 certificate. + """ + tls_certificate_bytes = OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_ASN1, tls_certificate + ) + return self._simple_insert( + table="server_tls_certificates", + keyvalues={ + "server_name": server_name, + "key_server": key_server, + "ts_added_ms": ts_now_ms, + "tls_certificate": tls_certificate_bytes, + }, + ) + + @defer.inlineCallbacks + def get_server_verification_key(self, server_name): + """Retrieve the NACL verification key for a given server + Args: + server_name (bytes): The name of the server. + Returns: + (nacl.signing.VerifyKey): The verification key. + """ + verification_key_bytes, = yield self._simple_select_one( + table="server_signature_keys", + key_values={"server_name": server_name}, + retcols=("tls_certificate",), + ) + verification_key = nacl.signing.VerifyKey(verification_key_bytes) + defer.returnValue(verification_key) + + def store_server_verification_key(self, server_name, key_version, + key_server, ts_now_ms, verification_key): + """Stores a NACL verification key for the given server. + Args: + server_name (bytes): The name of the server. + key_version (bytes): The version of the key for the server. + key_server (bytes): Where the verification key was looked up + ts_now_ms (int): The time now in milliseconds + verification_key (nacl.signing.VerifyKey): The NACL verify key. + """ + verification_key_bytes = verification_key.encode() + return self._simple_insert( + table="server_signature_keys", + key_values={ + "server_name": server_name, + "key_version": key_version, + "key_server": key_server, + "ts_added_ms": ts_now_ms, + "verification_key": verification_key_bytes, + }, + ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index d1f1a232f8..01ae190316 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -27,6 +27,9 @@ import logging logger = logging.getLogger(__name__) +OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level")) + + class RoomStore(SQLBaseStore): @defer.inlineCallbacks @@ -129,6 +132,98 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) + @defer.inlineCallbacks + def get_room_join_rule(self, room_id): + sql = ( + "SELECT join_rule FROM room_join_rules as r " + "INNER JOIN current_state_events as c " + "ON r.event_id = c.event_id " + "WHERE c.room_id = ? " + ) + + rows = yield self._execute(None, sql, room_id) + + if len(rows) == 1: + defer.returnValue(rows[0][0]) + else: + defer.returnValue(None) + + def get_power_level(self, room_id, user_id): + return self._db_pool.runInteraction( + self._get_power_level, + room_id, user_id, + ) + + def _get_power_level(self, txn, room_id, user_id): + sql = ( + "SELECT level FROM room_power_levels as r " + "INNER JOIN current_state_events as c " + "ON r.event_id = c.event_id " + "WHERE c.room_id = ? AND r.user_id = ? " + ) + + rows = txn.execute(sql, (room_id, user_id,)).fetchall() + + if len(rows) == 1: + return rows[0][0] + + sql = ( + "SELECT level FROM room_default_levels as r " + "INNER JOIN current_state_events as c " + "ON r.event_id = c.event_id " + "WHERE c.room_id = ? " + ) + + rows = txn.execute(sql, (room_id,)).fetchall() + + if len(rows) == 1: + return rows[0][0] + else: + return None + + def get_ops_levels(self, room_id): + return self._db_pool.runInteraction( + self._get_ops_levels, + room_id, + ) + + def _get_ops_levels(self, txn, room_id): + sql = ( + "SELECT ban_level, kick_level FROM room_ops_levels as r " + "INNER JOIN current_state_events as c " + "ON r.event_id = c.event_id " + "WHERE c.room_id = ? " + ) + + rows = txn.execute(sql, (room_id,)).fetchall() + + if len(rows) == 1: + return OpsLevel(rows[0][0], rows[0][1]) + else: + return OpsLevel(None, None) + + def get_add_state_level(self, room_id): + return self._get_level_from_table("room_add_state_levels", room_id) + + def get_send_event_level(self, room_id): + return self._get_level_from_table("room_send_event_levels", room_id) + + @defer.inlineCallbacks + def _get_level_from_table(self, table, room_id): + sql = ( + "SELECT level FROM %(table)s as r " + "INNER JOIN current_state_events as c " + "ON r.event_id = c.event_id " + "WHERE c.room_id = ? " + ) % {"table": table} + + rows = yield self._execute(None, sql, room_id) + + if len(rows) == 1: + defer.returnValue(rows[0][0]) + else: + defer.returnValue(None) + def _store_room_topic_txn(self, txn, event): self._simple_insert_txn( txn, @@ -151,6 +246,92 @@ class RoomStore(SQLBaseStore): } ) + def _store_join_rule(self, txn, event): + self._simple_insert_txn( + txn, + "room_join_rules", + { + "event_id": event.event_id, + "room_id": event.room_id, + "join_rule": event.content["join_rule"], + }, + ) + + def _store_power_levels(self, txn, event): + for user_id, level in event.content.items(): + if user_id == "default": + self._simple_insert_txn( + txn, + "room_default_levels", + { + "event_id": event.event_id, + "room_id": event.room_id, + "level": level, + }, + ) + else: + self._simple_insert_txn( + txn, + "room_power_levels", + { + "event_id": event.event_id, + "room_id": event.room_id, + "user_id": user_id, + "level": level + }, + ) + + def _store_default_level(self, txn, event): + self._simple_insert_txn( + txn, + "room_default_levels", + { + "event_id": event.event_id, + "room_id": event.room_id, + "level": event.content["default_level"], + }, + ) + + def _store_add_state_level(self, txn, event): + self._simple_insert_txn( + txn, + "room_add_state_levels", + { + "event_id": event.event_id, + "room_id": event.room_id, + "level": event.content["level"], + }, + ) + + def _store_send_event_level(self, txn, event): + self._simple_insert_txn( + txn, + "room_send_event_levels", + { + "event_id": event.event_id, + "room_id": event.room_id, + "level": event.content["level"], + }, + ) + + def _store_ops_level(self, txn, event): + content = { + "event_id": event.event_id, + "room_id": event.room_id, + } + + if "kick_level" in event.content: + content["kick_level"] = event.content["kick_level"] + + if "ban_level" in event.content: + content["ban_level"] = event.content["ban_level"] + + self._simple_insert_txn( + txn, + "room_ops_levels", + content, + ) + class RoomsTable(Table): table_name = "rooms" diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index e92f21ef3b..dbefbbda31 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -96,8 +96,71 @@ CREATE TABLE IF NOT EXISTS rooms( creator TEXT ); +CREATE TABLE IF NOT EXISTS room_join_rules( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + join_rule TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS room_join_rules_event_id ON room_join_rules(event_id); +CREATE INDEX IF NOT EXISTS room_join_rules_room_id ON room_join_rules(room_id); + + +CREATE TABLE IF NOT EXISTS room_power_levels( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + user_id TEXT NOT NULL, + level INTEGER NOT NULL +); +CREATE INDEX IF NOT EXISTS room_power_levels_event_id ON room_power_levels(event_id); +CREATE INDEX IF NOT EXISTS room_power_levels_room_id ON room_power_levels(room_id); +CREATE INDEX IF NOT EXISTS room_power_levels_room_user ON room_power_levels(room_id, user_id); + + +CREATE TABLE IF NOT EXISTS room_default_levels( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + level INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS room_default_levels_event_id ON room_default_levels(event_id); +CREATE INDEX IF NOT EXISTS room_default_levels_room_id ON room_default_levels(room_id); + + +CREATE TABLE IF NOT EXISTS room_add_state_levels( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + level INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS room_add_state_levels_event_id ON room_add_state_levels(event_id); +CREATE INDEX IF NOT EXISTS room_add_state_levels_room_id ON room_add_state_levels(room_id); + + +CREATE TABLE IF NOT EXISTS room_send_event_levels( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + level INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS room_send_event_levels_event_id ON room_send_event_levels(event_id); +CREATE INDEX IF NOT EXISTS room_send_event_levels_room_id ON room_send_event_levels(room_id); + + +CREATE TABLE IF NOT EXISTS room_ops_levels( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + ban_level INTEGER, + kick_level INTEGER +); + +CREATE INDEX IF NOT EXISTS room_ops_levels_event_id ON room_ops_levels(event_id); +CREATE INDEX IF NOT EXISTS room_ops_levels_room_id ON room_ops_levels(room_id); + + CREATE TABLE IF NOT EXISTS room_hosts( room_id TEXT NOT NULL, host TEXT NOT NULL, CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE ); + +CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/keys.sql b/synapse/storage/schema/keys.sql new file mode 100644 index 0000000000..45cdbcecae --- /dev/null +++ b/synapse/storage/schema/keys.sql @@ -0,0 +1,30 @@ +/* Copyright 2014 matrix.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS server_tls_certificates( + server_name TEXT, -- Server name. + key_server TEXT, -- Which key server the certificate was fetched from. + ts_added_ms INTEGER, -- When the certifcate was added. + tls_certificate BLOB, -- DER encoded x509 certificate. + CONSTRAINT uniqueness UNIQUE (server_name) +); + +CREATE TABLE IF NOT EXISTS server_signature_keys( + server_name TEXT, -- Server name. + key_version TEXT, -- Key version. + key_server TEXT, -- Which key server the key was fetched form. + ts_added_ms INTEGER, -- When the key was added. + verification_key BLOB, -- NACL verification key. + CONSTRAINT uniqueness UNIQUE (server_name, key_version) +); diff --git a/synapse/streams/events.py b/synapse/streams/events.py index c68cf1a59c..08d6e6f733 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -17,6 +17,10 @@ from twisted.internet import defer from synapse.types import StreamToken +from synapse.handlers.presence import PresenceEventSource +from synapse.handlers.room import RoomEventSource +from synapse.handlers.typing import TypingNotificationEventSource + class NullSource(object): """This event source never yields any events and its token remains at @@ -24,146 +28,21 @@ class NullSource(object): def __init__(self, hs): pass - def get_new_events_for_user(self, user, from_token, limit): - return defer.succeed(([], from_token)) + def get_new_events_for_user(self, user, from_key, limit): + return defer.succeed(([], from_key)) - def get_current_token_part(self): + def get_current_key(self): return defer.succeed(0) def get_pagination_rows(self, user, pagination_config, key): return defer.succeed(([], pagination_config.from_token)) -class RoomEventSource(object): - def __init__(self, hs): - self.store = hs.get_datastore() - - @defer.inlineCallbacks - def get_new_events_for_user(self, user, from_token, limit): - # We just ignore the key for now. - - to_key = yield self.get_current_token_part() - - events, end_key = yield self.store.get_room_events_stream( - user_id=user.to_string(), - from_key=from_token.events_key, - to_key=to_key, - room_id=None, - limit=limit, - ) - - end_token = from_token.copy_and_replace("events_key", end_key) - - defer.returnValue((events, end_token)) - - def get_current_token_part(self): - return self.store.get_room_events_max_id() - - @defer.inlineCallbacks - def get_pagination_rows(self, user, pagination_config, key): - from_token = pagination_config.from_token - to_token = pagination_config.to_token - limit = pagination_config.limit - direction = pagination_config.direction - - to_key = to_token.events_key if to_token else None - - events, next_key = yield self.store.paginate_room_events( - room_id=key, - from_key=from_token.events_key, - to_key=to_key, - direction=direction, - limit=limit, - with_feedback=True - ) - - next_token = from_token.copy_and_replace("events_key", next_key) - - defer.returnValue((events, next_token)) - - -class PresenceSource(object): - def __init__(self, hs): - self.hs = hs - self.clock = hs.get_clock() - - def get_new_events_for_user(self, user, from_token, limit): - from_key = int(from_token.presence_key) - - presence = self.hs.get_handlers().presence_handler - cachemap = presence._user_cachemap - - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial] - - if updates: - clock = self.clock - - latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - - end_token = from_token.copy_and_replace( - "presence_key", latest_serial - ) - return ((data, end_token)) - else: - end_token = from_token.copy_and_replace( - "presence_key", presence._user_cachemap_latest_serial - ) - return (([], end_token)) - - def get_current_token_part(self): - presence = self.hs.get_handlers().presence_handler - return presence._user_cachemap_latest_serial - - def get_pagination_rows(self, user, pagination_config, key): - # TODO (erikj): Does this make sense? Ordering? - - from_token = pagination_config.from_token - to_token = pagination_config.to_token - - from_key = int(from_token.presence_key) - - if to_token: - to_key = int(to_token.presence_key) - else: - to_key = -1 - - presence = self.hs.get_handlers().presence_handler - cachemap = presence._user_cachemap - - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if to_key < cachemap[k].serial < from_key] - - if updates: - clock = self.clock - - earliest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - - if to_token: - next_token = to_token - else: - next_token = from_token - - next_token = next_token.copy_and_replace( - "presence_key", earliest_serial - ) - return ((data, next_token)) - else: - if not to_token: - to_token = from_token.copy_and_replace( - "presence_key", 0 - ) - return (([], to_token)) - - class EventSources(object): SOURCE_TYPES = { "room": RoomEventSource, - "presence": PresenceSource, + "presence": PresenceEventSource, + "typing": TypingNotificationEventSource, } def __init__(self, hs): @@ -172,24 +51,29 @@ class EventSources(object): for name, cls in EventSources.SOURCE_TYPES.items() } - @staticmethod - def create_token(events_key, presence_key): - return StreamToken(events_key=events_key, presence_key=presence_key) - @defer.inlineCallbacks def get_current_token(self): - events_key = yield self.sources["room"].get_current_token_part() - presence_key = yield self.sources["presence"].get_current_token_part() - token = EventSources.create_token(events_key, presence_key) + token = StreamToken( + room_key=( + yield self.sources["room"].get_current_key() + ), + presence_key=( + yield self.sources["presence"].get_current_key() + ), + typing_key=( + yield self.sources["typing"].get_current_key() + ) + ) defer.returnValue(token) class StreamSource(object): - def get_new_events_for_user(self, user, from_token, limit): + def get_new_events_for_user(self, user, from_key, limit): + """from_key is the key within this event source.""" raise NotImplementedError("get_new_events_for_user") - def get_current_token_part(self): - raise NotImplementedError("get_current_token_part") + def get_current_key(self): + raise NotImplementedError("get_current_key") def get_pagination_rows(self, user, pagination_config, key): raise NotImplementedError("get_rows") diff --git a/synapse/types.py b/synapse/types.py index 63154855dd..1a9dceabf5 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -97,7 +97,7 @@ class RoomID(DomainSpecificString): class StreamToken( namedtuple( "Token", - ("events_key", "presence_key") + ("room_key", "presence_key", "typing_key") ) ): _SEPARATOR = "_" @@ -105,21 +105,14 @@ class StreamToken( @classmethod def from_string(cls, string): try: - events_key, presence_key = string.split(cls._SEPARATOR) + keys = string.split(cls._SEPARATOR) - return cls( - events_key=events_key, - presence_key=presence_key, - ) + return cls(*keys) except: raise SynapseError(400, "Invalid Token") def to_string(self): - return "".join([ - str(self.events_key), - self._SEPARATOR, - str(self.presence_key), - ]) + return self._SEPARATOR.join([str(k) for k in self]) def copy_and_replace(self, key, new_value): d = self._asdict() |