diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2473a2b2bb..54ecbe5b3a 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -17,9 +17,10 @@
from twisted.internet import defer
-from synapse.api.constants import Membership
+from synapse.api.constants import Membership, JoinRules
from synapse.api.errors import AuthError, StoreError, Codes
from synapse.api.events.room import RoomMemberEvent
+from synapse.util.logutils import log_function
import logging
@@ -44,16 +45,29 @@ class Auth(object):
"""
try:
if hasattr(event, "room_id"):
+ is_state = hasattr(event, "state_key")
+
if event.type == RoomMemberEvent.TYPE:
+ yield self._can_replace_state(event)
allowed = yield self.is_membership_change_allowed(event)
defer.returnValue(allowed)
+ return
+
+ self._check_joined_room(
+ member=snapshot.membership_state,
+ user_id=snapshot.user_id,
+ room_id=snapshot.room_id,
+ )
+
+ if is_state:
+ # TODO (erikj): This really only should be called for *new*
+ # state
+ yield self._can_add_state(event)
+ yield self._can_replace_state(event)
else:
- self._check_joined_room(
- member=snapshot.membership_state,
- user_id=snapshot.user_id,
- room_id=snapshot.room_id,
- )
- defer.returnValue(True)
+ yield self._can_send_event(event)
+
+ defer.returnValue(True)
else:
raise AuthError(500, "Unknown event: %s" % event)
except AuthError as e:
@@ -111,7 +125,14 @@ class Auth(object):
membership = event.content["membership"]
+ join_rule = yield self.store.get_room_join_rule(event.room_id)
+ if not join_rule:
+ join_rule = JoinRules.INVITE
+
if Membership.INVITE == membership:
+ # TODO (erikj): We should probably handle this more intelligently
+ # PRIVATE join rules.
+
# Invites are valid iff caller is in the room and target isn't.
if not caller_in_room: # caller isn't joined
raise AuthError(403, "You are not in room %s." % event.room_id)
@@ -124,18 +145,54 @@ class Auth(object):
# joined: It's a NOOP
if event.user_id != target_user_id:
raise AuthError(403, "Cannot force another user to join.")
- elif room.is_public:
- pass # anyone can join public rooms.
- elif (not caller or caller.membership not in
- [Membership.INVITE, Membership.JOIN]):
- raise AuthError(403, "You are not invited to this room.")
+ elif join_rule == JoinRules.PUBLIC or room.is_public:
+ pass
+ elif join_rule == JoinRules.INVITE:
+ if (
+ not caller or caller.membership not in
+ [Membership.INVITE, Membership.JOIN]
+ ):
+ raise AuthError(403, "You are not invited to this room.")
+ else:
+ # TODO (erikj): may_join list
+ # TODO (erikj): private rooms
+ raise AuthError(403, "You are not allowed to join this room")
elif Membership.LEAVE == membership:
+ # TODO (erikj): Implement kicks.
+
if not caller_in_room: # trying to leave a room you aren't joined
raise AuthError(403, "You are not in room %s." % event.room_id)
elif target_user_id != event.user_id:
- # trying to force another user to leave
- raise AuthError(403, "Cannot force %s to leave." %
- target_user_id)
+ user_level = yield self.store.get_power_level(
+ event.room_id,
+ event.user_id,
+ )
+ _, kick_level = yield self.store.get_ops_levels(event.room_id)
+
+ if kick_level:
+ kick_level = int(kick_level)
+ else:
+ kick_level = 5
+
+ if user_level < kick_level:
+ raise AuthError(
+ 403, "You cannot kick user %s." % target_user_id
+ )
+ elif Membership.BAN == membership:
+ user_level = yield self.store.get_power_level(
+ event.room_id,
+ event.user_id,
+ )
+
+ ban_level, _ = yield self.store.get_ops_levels(event.room_id)
+
+ if ban_level:
+ ban_level = int(ban_level)
+ else:
+ ban_level = 5 # FIXME (erikj): What should we do here?
+
+ if user_level < ban_level:
+ raise AuthError(403, "You don't have permission to ban")
else:
raise AuthError(500, "Unknown membership %s" % membership)
@@ -176,3 +233,85 @@ class Auth(object):
except StoreError:
raise AuthError(403, "Unrecognised access token.",
errcode=Codes.UNKNOWN_TOKEN)
+
+ @defer.inlineCallbacks
+ @log_function
+ def _can_send_event(self, event):
+ send_level = yield self.store.get_send_event_level(event.room_id)
+
+ if send_level:
+ send_level = int(send_level)
+ else:
+ send_level = 0
+
+ user_level = yield self.store.get_power_level(
+ event.room_id,
+ event.user_id,
+ )
+
+ if user_level:
+ user_level = int(user_level)
+ else:
+ user_level = 0
+
+ if user_level < send_level:
+ raise AuthError(
+ 403, "You don't have permission to post to the room"
+ )
+
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def _can_add_state(self, event):
+ add_level = yield self.store.get_add_state_level(event.room_id)
+
+ if not add_level:
+ defer.returnValue(True)
+
+ add_level = int(add_level)
+
+ user_level = yield self.store.get_power_level(
+ event.room_id,
+ event.user_id,
+ )
+
+ user_level = int(user_level)
+
+ if user_level < add_level:
+ raise AuthError(
+ 403, "You don't have permission to add state to the room"
+ )
+
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def _can_replace_state(self, event):
+ current_state = yield self.store.get_current_state(
+ event.room_id,
+ event.type,
+ event.state_key,
+ )
+
+ if current_state:
+ current_state = current_state[0]
+
+ user_level = yield self.store.get_power_level(
+ event.room_id,
+ event.user_id,
+ )
+
+ if user_level:
+ user_level = int(user_level)
+ else:
+ user_level = 0
+
+ logger.debug("Checking power level for %s, %s", event.user_id, user_level)
+ if current_state and hasattr(current_state, "required_power_level"):
+ req = current_state.required_power_level
+
+ logger.debug("Checked power level for %s, %s", event.user_id, req)
+ if user_level < req:
+ raise AuthError(
+ 403,
+ "You don't have permission to change that state"
+ )
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index f69f2445a2..668ffa07ca 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -23,7 +23,8 @@ class Membership(object):
JOIN = u"join"
KNOCK = u"knock"
LEAVE = u"leave"
- LIST = (INVITE, JOIN, KNOCK, LEAVE)
+ BAN = u"ban"
+ LIST = (INVITE, JOIN, KNOCK, LEAVE, BAN)
class Feedback(object):
@@ -42,3 +43,10 @@ class PresenceState(object):
UNAVAILABLE = u"unavailable"
ONLINE = u"online"
FREE_FOR_CHAT = u"free_for_chat"
+
+
+class JoinRules(object):
+ PUBLIC = u"public"
+ KNOCK = u"knock"
+ INVITE = u"invite"
+ PRIVATE = u"private"
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index f9653e0b2a..9502f5df8f 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -42,6 +42,7 @@ class SynapseEvent(JsonEncodedObject):
"user_id", # sender/initiator
"content", # HTTP body, JSON
"state_key",
+ "required_power_level",
]
internal_keys = [
@@ -52,6 +53,7 @@ class SynapseEvent(JsonEncodedObject):
"destinations",
"origin",
"outlier",
+ "power_level",
]
required_keys = [
@@ -152,3 +154,10 @@ class SynapseEvent(JsonEncodedObject):
msg = self._check_json(entry, template[key][0])
if msg:
return msg
+
+
+class SynapseStateEvent(SynapseEvent):
+ def __init__(self, **kwargs):
+ if "state_key" not in kwargs:
+ kwargs["state_key"] = ""
+ super(SynapseStateEvent, self).__init__(**kwargs)
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index c2cdcddf41..159728b2d2 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -16,6 +16,8 @@
from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent,
+ RoomPowerLevelsEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent,
+ RoomCreateEvent, RoomAddStateLevelEvent, RoomSendEventLevelEvent
)
from synapse.util.stringutils import random_string
@@ -30,7 +32,13 @@ class EventFactory(object):
RoomMemberEvent,
FeedbackEvent,
InviteJoinEvent,
- RoomConfigEvent
+ RoomConfigEvent,
+ RoomPowerLevelsEvent,
+ RoomJoinRulesEvent,
+ RoomCreateEvent,
+ RoomAddStateLevelEvent,
+ RoomSendEventLevelEvent,
+ RoomOpsPowerLevelsEvent,
]
def __init__(self, hs):
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index 9faad57ac0..f6d3c59a9a 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -15,7 +15,7 @@
from synapse.api.constants import Feedback, Membership
from synapse.api.errors import SynapseError
-from . import SynapseEvent
+from . import SynapseEvent, SynapseStateEvent
class GenericEvent(SynapseEvent):
@@ -132,3 +132,45 @@ class RoomConfigEvent(SynapseEvent):
def get_content_template(self):
return {}
+
+
+class RoomCreateEvent(SynapseStateEvent):
+ TYPE = "m.room.create"
+
+ def get_content_template(self):
+ return {}
+
+
+class RoomJoinRulesEvent(SynapseStateEvent):
+ TYPE = "m.room.join_rules"
+
+ def get_content_template(self):
+ return {}
+
+
+class RoomPowerLevelsEvent(SynapseStateEvent):
+ TYPE = "m.room.power_levels"
+
+ def get_content_template(self):
+ return {}
+
+
+class RoomAddStateLevelEvent(SynapseStateEvent):
+ TYPE = "m.room.add_state_level"
+
+ def get_content_template(self):
+ return {}
+
+
+class RoomSendEventLevelEvent(SynapseStateEvent):
+ TYPE = "m.room.send_event_level"
+
+ def get_content_template(self):
+ return {}
+
+
+class RoomOpsPowerLevelsEvent(SynapseStateEvent):
+ TYPE = "m.room.ops_levels"
+
+ def get_content_template(self):
+ return {}
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 05ca000787..3d0b5de965 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -15,7 +15,7 @@
"""Contains the URL paths to prefix various aspects of the server with. """
-CLIENT_PREFIX = "/matrix/client/api/v1"
-FEDERATION_PREFIX = "/matrix/federation/v1"
-WEB_CLIENT_PREFIX = "/matrix/client"
-CONTENT_REPO_PREFIX = "/matrix/content"
\ No newline at end of file
+CLIENT_PREFIX = "/_matrix/client/api/v1"
+FEDERATION_PREFIX = "/_matrix/federation/v1"
+WEB_CLIENT_PREFIX = "/_matrix/client"
+CONTENT_REPO_PREFIX = "/_matrix/content"
\ No newline at end of file
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 6d292ccf9a..606c9c650d 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,7 +20,6 @@ from synapse.server import HomeServer
from twisted.internet import reactor
from twisted.enterprise import adbapi
-from twisted.python.log import PythonLoggingObserver
from twisted.web.resource import Resource
from twisted.web.static import File
from twisted.web.server import Site
@@ -29,16 +28,17 @@ from synapse.http.client import TwistedHttpClient
from synapse.api.urls import (
CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX
)
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
from daemonize import Daemonize
import twisted.manhole.telnet
-import argparse
import logging
-import logging.config
import sqlite3
import os
import re
+import sys
logger = logging.getLogger(__name__)
@@ -56,13 +56,13 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 1
+SCHEMA_VERSION = 2
class SynapseHomeServer(HomeServer):
def build_http_client(self):
- return TwistedHttpClient()
+ return TwistedHttpClient(self)
def build_resource_for_client(self):
return JsonResource()
@@ -206,37 +206,17 @@ class SynapseHomeServer(HomeServer):
"""
return "%s-%s" % (resource, path_seg)
- def start_listening(self, port):
- reactor.listenTCP(port, Site(self.root_resource))
- logger.info("Synapse now listening on port %d", port)
-
-
-def setup_logging(verbosity=0, filename=None, config_path=None):
- """ Sets up logging with verbosity levels.
-
- Args:
- verbosity: The verbosity level.
- filename: Log to the given file rather than to the console.
- config_path: Path to a python logging config file.
- """
-
- if config_path is None:
- log_format = (
- '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s'
- )
-
- level = logging.INFO
- if verbosity:
- level = logging.DEBUG
-
- # FIXME: we need a logging.WARN for a -q quiet option
-
- logging.basicConfig(level=level, filename=filename, format=log_format)
- else:
- logging.config.fileConfig(config_path)
-
- observer = PythonLoggingObserver()
- observer.start()
+ def start_listening(self, secure_port, unsecure_port):
+ if secure_port is not None:
+ reactor.listenSSL(
+ secure_port, Site(self.root_resource), self.tls_context_factory
+ )
+ logger.info("Synapse now listening on port %d", secure_port)
+ if unsecure_port is not None:
+ reactor.listenTCP(
+ unsecure_port, Site(self.root_resource)
+ )
+ logger.info("Synapse now listening on port %d", unsecure_port)
def run():
@@ -244,78 +224,53 @@ def run():
def setup():
- parser = argparse.ArgumentParser()
- parser.add_argument("-p", "--port", dest="port", type=int, default=8080,
- help="The port to listen on.")
- parser.add_argument("-d", "--database", dest="db", default="homeserver.db",
- help="The database name.")
- parser.add_argument("-H", "--host", dest="host", default="localhost",
- help="The hostname of the server.")
- parser.add_argument('-v', '--verbose', dest="verbose", action='count',
- help="The verbosity level.")
- parser.add_argument('-f', '--log-file', dest="log_file", default=None,
- help="File to log to.")
- parser.add_argument('--log-config', dest="log_config", default=None,
- help="Python logging config")
- parser.add_argument('-D', '--daemonize', action='store_true',
- default=False, help="Daemonize the home server")
- parser.add_argument('--pid-file', dest="pid", help="When running as a "
- "daemon, the file to store the pid in",
- default="hs.pid")
- parser.add_argument("-W", "--webclient", dest="webclient", default=True,
- action="store_false", help="Don't host a web client.")
- parser.add_argument("--manhole", dest="manhole", type=int, default=None,
- help="Turn on the twisted telnet manhole service.")
- args = parser.parse_args()
-
- verbosity = int(args.verbose) if args.verbose else None
-
- # Because if/when we daemonize we change to root dir.
- db_name = os.path.abspath(args.db)
- log_file = args.log_file
- if log_file:
- log_file = os.path.abspath(log_file)
-
- setup_logging(
- verbosity=verbosity,
- filename=log_file,
- config_path=args.log_config,
+ config = HomeServerConfig.load_config(
+ "Synapse Homeserver",
+ sys.argv[1:],
+ generate_section="Homeserver"
)
- logger.info("Server hostname: %s", args.host)
+ config.setup_logging()
- if re.search(":[0-9]+$", args.host):
- domain_with_port = args.host
+ logger.info("Server hostname: %s", config.server_name)
+
+ if re.search(":[0-9]+$", config.server_name):
+ domain_with_port = config.server_name
else:
- domain_with_port = "%s:%s" % (args.host, args.port)
+ domain_with_port = "%s:%s" % (config.server_name, config.bind_port)
+
+ tls_context_factory = context_factory.ServerContextFactory(config)
hs = SynapseHomeServer(
- args.host,
+ config.server_name,
domain_with_port=domain_with_port,
upload_dir=os.path.abspath("uploads"),
- db_name=db_name,
+ db_name=config.database_path,
+ tls_context_factory=tls_context_factory,
)
hs.register_servlets()
hs.create_resource_tree(
- web_client=args.webclient,
- redirect_root_to_web_client=True)
- hs.start_listening(args.port)
+ web_client=config.webclient,
+ redirect_root_to_web_client=True,
+ )
+ hs.start_listening(config.bind_port, config.unsecure_port)
hs.get_db_pool()
- if args.manhole:
+ if config.manhole:
f = twisted.manhole.telnet.ShellFactory()
f.username = "matrix"
f.password = "rabbithole"
f.namespace['hs'] = hs
- reactor.listenTCP(args.manhole, f, interface='127.0.0.1')
+ reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
- if args.daemonize:
+ if config.daemonize:
+ print config.pid_file
daemon = Daemonize(
app="synapse-homeserver",
- pid=args.pid,
+ pid=config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
diff --git a/synapse/config/__init__.py b/synapse/config/__init__.py
new file mode 100644
index 0000000000..fe8a073cd3
--- /dev/null
+++ b/synapse/config/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
new file mode 100644
index 0000000000..1913179c3a
--- /dev/null
+++ b/synapse/config/_base.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import ConfigParser as configparser
+import argparse
+import sys
+import os
+import yaml
+
+
+class ConfigError(Exception):
+ pass
+
+
+class Config(object):
+ def __init__(self, args):
+ pass
+
+ @staticmethod
+ def abspath(file_path):
+ return os.path.abspath(file_path) if file_path else file_path
+
+ @classmethod
+ def check_file(cls, file_path, config_name):
+ if file_path is None:
+ raise ConfigError(
+ "Missing config for %s."
+ " Try running again with --generate-config"
+ % (config_name,)
+ )
+ if not os.path.exists(file_path):
+ raise ConfigError(
+ "File % config for %s doesn't exist."
+ " Try running again with --generate-config"
+ % (config_name,)
+ )
+ return cls.abspath(file_path)
+
+ @classmethod
+ def read_file(cls, file_path, config_name):
+ cls.check_file(file_path, config_name)
+ with open(file_path) as file_stream:
+ return file_stream.read()
+
+ @staticmethod
+ def read_config_file(file_path):
+ with open(file_path) as file_stream:
+ return yaml.load(file_stream)
+
+ @classmethod
+ def add_arguments(cls, parser):
+ pass
+
+ @classmethod
+ def generate_config(cls, args, config_dir_path):
+ pass
+
+ @classmethod
+ def load_config(cls, description, argv, generate_section=None):
+ config_parser = argparse.ArgumentParser(add_help=False)
+ config_parser.add_argument(
+ "-c", "--config-path",
+ metavar="CONFIG_FILE",
+ help="Specify config file"
+ )
+ config_parser.add_argument(
+ "--generate-config",
+ action="store_true",
+ help="Generate config file"
+ )
+ config_args, remaining_args = config_parser.parse_known_args(argv)
+
+ if config_args.generate_config:
+ if not config_args.config_path:
+ config_parser.error(
+ "Must specify where to generate the config file"
+ )
+ config_dir_path = os.path.dirname(config_args.config_path)
+ if os.path.exists(config_args.config_path):
+ defaults = cls.read_config_file(config_args.config_path)
+ else:
+ defaults = {}
+ else:
+ if config_args.config_path:
+ defaults = cls.read_config_file(config_args.config_path)
+ else:
+ defaults = {}
+
+ parser = argparse.ArgumentParser(
+ parents=[config_parser],
+ description=description,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ cls.add_arguments(parser)
+ parser.set_defaults(**defaults)
+
+ args = parser.parse_args(remaining_args)
+
+ if config_args.generate_config:
+ config_dir_path = os.path.dirname(config_args.config_path)
+ config_dir_path = os.path.abspath(config_dir_path)
+ if not os.path.exists(config_dir_path):
+ os.makedirs(config_dir_path)
+ cls.generate_config(args, config_dir_path)
+ config = {}
+ for key, value in vars(args).items():
+ if (key not in set(["config_path", "generate_config"])
+ and value is not None):
+ config[key] = value
+ with open(config_args.config_path, "w") as config_file:
+ yaml.dump(config, config_file, default_flow_style=False)
+ sys.exit(0)
+
+ return cls(args)
+
+
+
diff --git a/synapse/config/database.py b/synapse/config/database.py
new file mode 100644
index 0000000000..edf2361914
--- /dev/null
+++ b/synapse/config/database.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+import os
+
+class DatabaseConfig(Config):
+ def __init__(self, args):
+ super(DatabaseConfig, self).__init__(args)
+ self.database_path = self.abspath(args.database_path)
+
+ @classmethod
+ def add_arguments(cls, parser):
+ super(DatabaseConfig, cls).add_arguments(parser)
+ db_group = parser.add_argument_group("database")
+ db_group.add_argument(
+ "-d", "--database-path", default="homeserver.db",
+ help="The database name."
+ )
+
+ @classmethod
+ def generate_config(cls, args, config_dir_path):
+ super(DatabaseConfig, cls).generate_config(args, config_dir_path)
+ args.database_path = os.path.abspath(args.database_path)
+
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
new file mode 100644
index 0000000000..18072e3196
--- /dev/null
+++ b/synapse/config/homeserver.py
@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .tls import TlsConfig
+from .server import ServerConfig
+from .logger import LoggingConfig
+from .database import DatabaseConfig
+
+class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig):
+ pass
+
+if __name__=='__main__':
+ import sys
+ HomeServerConfig.load_config("Generate config", sys.argv[1:], "HomeServer")
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
new file mode 100644
index 0000000000..8db6621ae8
--- /dev/null
+++ b/synapse/config/logger.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+from twisted.python.log import PythonLoggingObserver
+import logging
+import logging.config
+
+class LoggingConfig(Config):
+ def __init__(self, args):
+ super(LoggingConfig, self).__init__(args)
+ self.verbosity = int(args.verbose) if args.verbose else None
+ self.log_config = self.abspath(args.log_config)
+ self.log_file = self.abspath(args.log_file)
+
+ @classmethod
+ def add_arguments(cls, parser):
+ super(LoggingConfig, cls).add_arguments(parser)
+ logging_group = parser.add_argument_group("logging")
+ logging_group.add_argument(
+ '-v', '--verbose', dest="verbose", action='count',
+ help="The verbosity level."
+ )
+ logging_group.add_argument(
+ '-f', '--log-file', dest="log_file", default=None,
+ help="File to log to."
+ )
+ logging_group.add_argument(
+ '--log-config', dest="log_config", default=None,
+ help="Python logging config file"
+ )
+
+ def setup_logging(self):
+ log_format = (
+ '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s'
+ )
+ if self.log_config is None:
+
+ level = logging.INFO
+ if self.verbosity:
+ level = logging.DEBUG
+
+ # FIXME: we need a logging.WARN for a -q quiet option
+
+ logging.basicConfig(
+ level=level,
+ filename=self.log_file,
+ format=log_format
+ )
+ else:
+ logging.config.fileConfig(self.log_config)
+
+ observer = PythonLoggingObserver()
+ observer.start()
diff --git a/synapse/config/server.py b/synapse/config/server.py
new file mode 100644
index 0000000000..36143e3c9c
--- /dev/null
+++ b/synapse/config/server.py
@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import nacl.signing
+import os
+from ._base import Config
+from syutil.base64util import encode_base64, decode_base64
+
+
+class ServerConfig(Config):
+ def __init__(self, args):
+ super(ServerConfig, self).__init__(args)
+ self.server_name = args.server_name
+ self.signing_key = self.read_signing_key(args.signing_key_path)
+ self.bind_port = args.bind_port
+ self.bind_host = args.bind_host
+ self.unsecure_port = args.unsecure_port
+ self.daemonize = args.daemonize
+ self.pid_file = self.abspath(args.pid_file)
+ self.webclient = True
+ self.manhole = args.manhole
+
+ @classmethod
+ def add_arguments(cls, parser):
+ super(ServerConfig, cls).add_arguments(parser)
+ server_group = parser.add_argument_group("server")
+ server_group.add_argument("-H", "--server-name", default="localhost",
+ help="The name of the server")
+ server_group.add_argument("--signing-key-path",
+ help="The signing key to sign messages with")
+ server_group.add_argument("-p", "--bind-port", metavar="PORT",
+ type=int, help="https port to listen on",
+ default=8448)
+ server_group.add_argument("--unsecure-port", metavar="PORT",
+ type=int, help="http port to listen on",
+ default=8008)
+ server_group.add_argument("--bind-host", default="",
+ help="Local interface to listen on")
+ server_group.add_argument("-D", "--daemonize", action='store_true',
+ help="Daemonize the home server")
+ server_group.add_argument('--pid-file', default="hs.pid",
+ help="When running as a daemon, the file to"
+ " store the pid in")
+ server_group.add_argument("--manhole", metavar="PORT", dest="manhole",
+ type=int,
+ help="Turn on the twisted telnet manhole"
+ " service on the given port.")
+
+ def read_signing_key(self, signing_key_path):
+ signing_key_base64 = self.read_file(signing_key_path, "signing_key")
+ signing_key_bytes = decode_base64(signing_key_base64)
+ return nacl.signing.SigningKey(signing_key_bytes)
+
+ @classmethod
+ def generate_config(cls, args, config_dir_path):
+ super(ServerConfig, cls).generate_config(args, config_dir_path)
+ base_key_name = os.path.join(config_dir_path, args.server_name)
+
+ args.pid_file = os.path.abspath(args.pid_file)
+
+ if not args.signing_key_path:
+ args.signing_key_path = base_key_name + ".signing.key"
+
+ if not os.path.exists(args.signing_key_path):
+ with open(args.signing_key_path, "w") as signing_key_file:
+ key = nacl.signing.SigningKey.generate()
+ signing_key_file.write(encode_base64(key.encode()))
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
new file mode 100644
index 0000000000..16f6f3aba6
--- /dev/null
+++ b/synapse/config/tls.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+from OpenSSL import crypto
+import subprocess
+import os
+
+GENERATE_DH_PARAMS=False
+
+
+class TlsConfig(Config):
+ def __init__(self, args):
+ super(TlsConfig, self).__init__(args)
+ self.tls_certificate = self.read_tls_certificate(
+ args.tls_certificate_path
+ )
+ self.tls_private_key = self.read_tls_private_key(
+ args.tls_private_key_path
+ )
+ self.tls_dh_params_path = self.check_file(
+ args.tls_dh_params_path, "tls_dh_params"
+ )
+
+ @classmethod
+ def add_arguments(cls, parser):
+ super(TlsConfig, cls).add_arguments(parser)
+ tls_group = parser.add_argument_group("tls")
+ tls_group.add_argument("--tls-certificate-path",
+ help="PEM encoded X509 certificate for TLS")
+ tls_group.add_argument("--tls-private-key-path",
+ help="PEM encoded private key for TLS")
+ tls_group.add_argument("--tls-dh-params-path",
+ help="PEM dh parameters for ephemeral keys")
+
+ def read_tls_certificate(self, cert_path):
+ cert_pem = self.read_file(cert_path, "tls_certificate")
+ return crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
+
+ def read_tls_private_key(self, private_key_path):
+ private_key_pem = self.read_file(private_key_path, "tls_private_key")
+ return crypto.load_privatekey(crypto.FILETYPE_PEM, private_key_pem)
+
+ @classmethod
+ def generate_config(cls, args, config_dir_path):
+ super(TlsConfig, cls).generate_config(args, config_dir_path)
+ base_key_name = os.path.join(config_dir_path, args.server_name)
+
+ if args.tls_certificate_path is None:
+ args.tls_certificate_path = base_key_name + ".tls.crt"
+
+ if args.tls_private_key_path is None:
+ args.tls_private_key_path = base_key_name + ".tls.key"
+
+ if args.tls_dh_params_path is None:
+ args.tls_dh_params_path = base_key_name + ".tls.dh"
+
+ if not os.path.exists(args.tls_private_key_path):
+ with open(args.tls_private_key_path, "w") as private_key_file:
+ tls_private_key = crypto.PKey()
+ tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
+ private_key_pem = crypto.dump_privatekey(
+ crypto.FILETYPE_PEM, tls_private_key
+ )
+ private_key_file.write(private_key_pem)
+ else:
+ with open(args.tls_private_key_path) as private_key_file:
+ private_key_pem = private_key_file.read()
+ tls_private_key = crypto.load_privatekey(
+ crypto.FILETYPE_PEM, private_key_pem
+ )
+
+ if not os.path.exists(args.tls_certificate_path):
+ with open(args.tls_certificate_path, "w") as certifcate_file:
+ cert = crypto.X509()
+ subject = cert.get_subject()
+ subject.CN = args.server_name
+
+ cert.set_serial_number(1000)
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(tls_private_key)
+
+ cert.sign(tls_private_key, 'sha256')
+
+ cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+
+ certifcate_file.write(cert_pem)
+
+ if not os.path.exists(args.tls_dh_params_path):
+ if GENERATE_DH_PARAMS:
+ subprocess.check_call([
+ "openssl", "dhparam",
+ "-outform", "PEM",
+ "-out", args.tls_dh_params_path,
+ "2048"
+ ])
+ else:
+ with open(args.tls_dh_params_path, "w") as dh_params_file:
+ dh_params_file.write(
+ "2048-bit DH parameters taken from rfc3526\n"
+ "-----BEGIN DH PARAMETERS-----\n"
+ "MIIBCAKCAQEA///////////JD9qiIWjC"
+ "NMTGYouA3BzRKQJOCIpnzHQCC76mOxOb\n"
+ "IlFKCHmONATd75UZs806QxswKwpt8l8U"
+ "N0/hNW1tUcJF5IW1dmJefsb0TELppjft\n"
+ "awv/XLb0Brft7jhr+1qJn6WunyQRfEsf"
+ "5kkoZlHs5Fs9wgB8uKFjvwWY2kg2HFXT\n"
+ "mmkWP6j9JM9fg2VdI9yjrZYcYvNWIIVS"
+ "u57VKQdwlpZtZww1Tkq8mATxdGwIyhgh\n"
+ "fDKQXkYuNs474553LBgOhgObJ4Oi7Aei"
+ "j7XFXfBvTFLJ3ivL9pVYFxg5lUl86pVq\n"
+ "5RXSJhiY+gUQFXKOWoqsqmj/////////"
+ "/wIBAg==\n"
+ "-----END DH PARAMETERS-----\n"
+ )
diff --git a/synapse/crypto/config.py b/synapse/crypto/config.py
deleted file mode 100644
index 2330133e71..0000000000
--- a/synapse/crypto/config.py
+++ /dev/null
@@ -1,160 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import ConfigParser as configparser
-import argparse
-import socket
-import sys
-import os
-from OpenSSL import crypto
-import nacl.signing
-from syutil.base64util import encode_base64
-import subprocess
-
-
-def load_config(description, argv):
- config_parser = argparse.ArgumentParser(add_help=False)
- config_parser.add_argument("-c", "--config-path", metavar="CONFIG_FILE",
- help="Specify config file")
- config_args, remaining_args = config_parser.parse_known_args(argv)
- if config_args.config_path:
- config = configparser.SafeConfigParser()
- config.read([config_args.config_path])
- defaults = dict(config.items("KeyServer"))
- else:
- defaults = {}
- parser = argparse.ArgumentParser(
- parents=[config_parser],
- description=description,
- formatter_class=argparse.RawDescriptionHelpFormatter,
- )
- parser.set_defaults(**defaults)
- parser.add_argument("--server-name", default=socket.getfqdn(),
- help="The name of the server")
- parser.add_argument("--signing-key-path",
- help="The signing key to sign responses with")
- parser.add_argument("--tls-certificate-path",
- help="PEM encoded X509 certificate for TLS")
- parser.add_argument("--tls-private-key-path",
- help="PEM encoded private key for TLS")
- parser.add_argument("--tls-dh-params-path",
- help="PEM encoded dh parameters for ephemeral keys")
- parser.add_argument("--bind-port", type=int,
- help="TCP port to listen on")
- parser.add_argument("--bind-host", default="",
- help="Local interface to listen on")
-
- args = parser.parse_args(remaining_args)
-
- server_config = vars(args)
- del server_config["config_path"]
- return server_config
-
-
-def generate_config(argv):
- parser = argparse.ArgumentParser()
- parser.add_argument("-c", "--config-path", help="Specify config file",
- metavar="CONFIG_FILE", required=True)
- parser.add_argument("--server-name", default=socket.getfqdn(),
- help="The name of the server")
- parser.add_argument("--signing-key-path",
- help="The signing key to sign responses with")
- parser.add_argument("--tls-certificate-path",
- help="PEM encoded X509 certificate for TLS")
- parser.add_argument("--tls-private-key-path",
- help="PEM encoded private key for TLS")
- parser.add_argument("--tls-dh-params-path",
- help="PEM encoded dh parameters for ephemeral keys")
- parser.add_argument("--bind-port", type=int, required=True,
- help="TCP port to listen on")
- parser.add_argument("--bind-host", default="",
- help="Local interface to listen on")
-
- args = parser.parse_args(argv)
-
- dir_name = os.path.dirname(args.config_path)
- base_key_name = os.path.join(dir_name, args.server_name)
-
- if args.signing_key_path is None:
- args.signing_key_path = base_key_name + ".signing.key"
-
- if args.tls_certificate_path is None:
- args.tls_certificate_path = base_key_name + ".tls.crt"
-
- if args.tls_private_key_path is None:
- args.tls_private_key_path = base_key_name + ".tls.key"
-
- if args.tls_dh_params_path is None:
- args.tls_dh_params_path = base_key_name + ".tls.dh"
-
- if not os.path.exists(args.signing_key_path):
- with open(args.signing_key_path, "w") as signing_key_file:
- key = nacl.signing.SigningKey.generate()
- signing_key_file.write(encode_base64(key.encode()))
-
- if not os.path.exists(args.tls_private_key_path):
- with open(args.tls_private_key_path, "w") as private_key_file:
- tls_private_key = crypto.PKey()
- tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
- private_key_pem = crypto.dump_privatekey(
- crypto.FILETYPE_PEM, tls_private_key
- )
- private_key_file.write(private_key_pem)
- else:
- with open(args.tls_private_key_path) as private_key_file:
- private_key_pem = private_key_file.read()
- tls_private_key = crypto.load_privatekey(
- crypto.FILETYPE_PEM, private_key_pem
- )
-
- if not os.path.exists(args.tls_certificate_path):
- with open(args.tls_certificate_path, "w") as certifcate_file:
- cert = crypto.X509()
- subject = cert.get_subject()
- subject.CN = args.server_name
-
- cert.set_serial_number(1000)
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
- cert.set_issuer(cert.get_subject())
- cert.set_pubkey(tls_private_key)
-
- cert.sign(tls_private_key, 'sha256')
-
- cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
-
- certifcate_file.write(cert_pem)
-
- if not os.path.exists(args.tls_dh_params_path):
- subprocess.check_call([
- "openssl", "dhparam",
- "-outform", "PEM",
- "-out", args.tls_dh_params_path,
- "2048"
- ])
-
- config = configparser.SafeConfigParser()
- config.add_section("KeyServer")
- for key, value in vars(args).items():
- if key != "config_path":
- config.set("KeyServer", key, str(value))
-
- with open(args.config_path, "w") as config_file:
- config.write(config_file)
-
-
-if __name__ == "__main__":
- generate_config(sys.argv[1:])
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
new file mode 100644
index 0000000000..45958abbf5
--- /dev/null
+++ b/synapse/crypto/context_factory.py
@@ -0,0 +1,29 @@
+from twisted.internet import reactor, ssl
+from OpenSSL import SSL
+from twisted.internet._sslverify import _OpenSSLECCurve, _defaultCurveName
+
+
+class ServerContextFactory(ssl.ContextFactory):
+ """Factory for PyOpenSSL SSL contexts that are used to handle incoming
+ connections and to make connections to remote servers."""
+
+ def __init__(self, config):
+ self._context = SSL.Context(SSL.SSLv23_METHOD)
+ self.configure_context(self._context, config)
+
+ @staticmethod
+ def configure_context(context, config):
+ try:
+ _ecCurve = _OpenSSLECCurve(_defaultCurveName)
+ _ecCurve.addECKeyToContext(context)
+ except:
+ pass
+ context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
+ context.use_certificate(config.tls_certificate)
+ context.use_privatekey(config.tls_private_key)
+ context.load_tmp_dh(config.tls_dh_params_path)
+ context.set_cipher_list("!ADH:HIGH+kEDH:!AECDH:HIGH+kEECDH")
+
+ def getContext(self):
+ return self._context
+
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 2b2f11f36a..b468f70546 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -68,6 +68,7 @@ class Pdu(JsonEncodedObject):
"power_level",
"prev_state_id",
"prev_state_origin",
+ "required_power_level",
]
internal_keys = [
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3d7f97bcff..4aeb2089f5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -76,6 +76,10 @@ class MessageHandler(BaseRoomHandler):
Raises:
SynapseError if something went wrong.
"""
+ # TODO(paul): Why does 'event' not have a 'user' object?
+ user = self.hs.parse_userid(event.user_id)
+ assert user.is_mine, "User must be our own: %s" % (user,)
+
if stamp_event:
event.content["hsob_ts"] = int(self.clock.time_msec())
@@ -86,6 +90,10 @@ class MessageHandler(BaseRoomHandler):
yield self._on_new_room_event(event, snapshot)
+ self.hs.get_handlers().presence_handler.bump_presence_active_time(
+ user
+ )
+
@defer.inlineCallbacks
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
feedback=False):
@@ -274,11 +282,11 @@ class MessageHandler(BaseRoomHandler):
messages, token = yield self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
- end_token=now_token.events_key,
+ end_token=now_token.room_key,
)
- start_token = now_token.copy_and_replace("events_key", token[0])
- end_token = now_token.copy_and_replace("events_key", token[1])
+ start_token = now_token.copy_and_replace("room_key", token[0])
+ end_token = now_token.copy_and_replace("room_key", token[1])
d["messages"] = {
"chunk": [m.get_dict() for m in messages],
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7731de85c0..9bfceda88a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,13 @@ def partitionbool(l, func):
class PresenceHandler(BaseHandler):
+ STATE_LEVELS = {
+ PresenceState.OFFLINE: 0,
+ PresenceState.UNAVAILABLE: 1,
+ PresenceState.ONLINE: 2,
+ PresenceState.FREE_FOR_CHAT: 3,
+ }
+
def __init__(self, hs):
super(PresenceHandler, self).__init__(hs)
@@ -135,7 +142,7 @@ class PresenceHandler(BaseHandler):
return self._user_cachemap[user]
else:
statuscache = UserPresenceCache()
- statuscache.update({"state": PresenceState.OFFLINE}, user)
+ statuscache.update({"presence": PresenceState.OFFLINE}, user)
return statuscache
def registered_user(self, user):
@@ -143,10 +150,6 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user):
- defer.returnValue(True)
- # return
- # FIXME (erikj): This code path absolutely kills the database.
-
assert(observed_user.is_mine)
if observer_user == observed_user:
@@ -173,19 +176,24 @@ class PresenceHandler(BaseHandler):
observed_user=target_user
)
- if visible:
- state = yield self.store.get_presence_state(
- target_user.localpart
- )
- else:
+ if not visible:
raise SynapseError(404, "Presence information not visible")
+ state = yield self.store.get_presence_state(target_user.localpart)
+ if "mtime" in state:
+ del state["mtime"]
+ state["presence"] = state["state"]
+
+ if target_user in self._user_cachemap:
+ state["last_active"] = (
+ self._user_cachemap[target_user].get_state()["last_active"]
+ )
else:
# TODO(paul): Have remote server send us permissions set
state = self._get_or_offline_usercache(target_user).get_state()
- if "mtime" in state and (state["mtime"] is not None):
- state["mtime_age"] = int(
- self.clock.time_msec() - state.pop("mtime")
+ if "last_active" in state:
+ state["last_active_ago"] = int(
+ self.clock.time_msec() - state.pop("last_active")
)
defer.returnValue(state)
@@ -202,20 +210,33 @@ class PresenceHandler(BaseHandler):
if target_user != auth_user:
raise AuthError(400, "Cannot set another user's displayname")
- # TODO(paul): Sanity-check 'state'
if "status_msg" not in state:
state["status_msg"] = None
for k in state.keys():
- if k not in ("state", "status_msg"):
+ if k not in ("presence", "state", "status_msg"):
raise SynapseError(
400, "Unexpected presence state key '%s'" % (k,)
)
+ # Handle legacy "state" key for now
+ if "state" in state:
+ state["presence"] = state.pop("state")
+
+ if state["presence"] not in self.STATE_LEVELS:
+ raise SynapseError(400, "'%s' is not a valid presence state" %
+ state["presence"]
+ )
+
logger.debug("Updating presence state of %s to %s",
- target_user.localpart, state["state"])
+ target_user.localpart, state["presence"])
state_to_store = dict(state)
+ state_to_store["state"] = state_to_store.pop("presence")
+
+ statuscache=self._get_or_offline_usercache(target_user)
+ was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
+ now_level = self.STATE_LEVELS[state["presence"]]
yield defer.DeferredList([
self.store.set_presence_state(
@@ -226,9 +247,10 @@ class PresenceHandler(BaseHandler):
),
])
- state["mtime"] = self.clock.time_msec()
+ if now_level > was_level:
+ state["last_active"] = self.clock.time_msec()
- now_online = state["state"] != PresenceState.OFFLINE
+ now_online = state["presence"] != PresenceState.OFFLINE
was_polling = target_user in self._user_cachemap
if now_online and not was_polling:
@@ -240,6 +262,12 @@ class PresenceHandler(BaseHandler):
# we don't have to do this all the time
self.changed_presencelike_data(target_user, state)
+ def bump_presence_active_time(self, user, now=None):
+ if now is None:
+ now = self.clock.time_msec()
+
+ self.changed_presencelike_data(user, {"last_active": now})
+
def changed_presencelike_data(self, user, state):
statuscache = self._get_or_make_usercache(user)
@@ -251,28 +279,27 @@ class PresenceHandler(BaseHandler):
@log_function
def started_user_eventstream(self, user):
# TODO(paul): Use "last online" state
- self.set_state(user, user, {"state": PresenceState.ONLINE})
+ self.set_state(user, user, {"presence": PresenceState.ONLINE})
@log_function
def stopped_user_eventstream(self, user):
# TODO(paul): Save current state as "last online" state
- self.set_state(user, user, {"state": PresenceState.OFFLINE})
+ self.set_state(user, user, {"presence": PresenceState.OFFLINE})
@defer.inlineCallbacks
def user_joined_room(self, user, room_id):
-
if user.is_mine:
- self.push_update_to_local_and_remote(
- observed_user=user,
- room_ids=[room_id],
- statuscache=self._get_or_offline_usercache(user),
- )
+ statuscache = self._get_or_make_usercache(user)
- else:
- self.push_update_to_clients(
+ # No actual update but we need to bump the serial anyway for the
+ # event source
+ self._user_cachemap_latest_serial += 1
+ statuscache.update({}, serial=self._user_cachemap_latest_serial)
+
+ self.push_update_to_local_and_remote(
observed_user=user,
room_ids=[room_id],
- statuscache=self._get_or_offline_usercache(user),
+ statuscache=statuscache,
)
# We also want to tell them about current presence of people.
@@ -386,9 +413,9 @@ class PresenceHandler(BaseHandler):
observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
p["observed_user"] = observed_user
p.update(self._get_or_offline_usercache(observed_user).get_state())
- if "mtime" in p:
- p["mtime_age"] = int(
- self.clock.time_msec() - p.pop("mtime")
+ if "last_active" in p:
+ p["last_active_ago"] = int(
+ self.clock.time_msec() - p.pop("last_active")
)
defer.returnValue(presence)
@@ -457,10 +484,6 @@ class PresenceHandler(BaseHandler):
def _start_polling_local(self, user, target_user):
target_localpart = target_user.localpart
- if not self.is_presence_visible(observer_user=user,
- observed_user=target_user):
- return
-
if target_localpart not in self._local_pushmap:
self._local_pushmap[target_localpart] = set()
@@ -577,21 +600,30 @@ class PresenceHandler(BaseHandler):
def _push_presence_remote(self, user, destination, state=None):
if state is None:
state = yield self.store.get_presence_state(user.localpart)
+ del state["mtime"]
+ state["presence"] = state["state"]
+
+ if user in self._user_cachemap:
+ state["last_active"] = (
+ self._user_cachemap[user].get_state()["last_active"]
+ )
yield self.distributor.fire(
"collect_presencelike_data", user, state
)
- if "mtime" in state:
+ if "last_active" in state:
state = dict(state)
- state["mtime_age"] = int(
- self.clock.time_msec() - state.pop("mtime")
+ state["last_active_ago"] = int(
+ self.clock.time_msec() - state.pop("last_active")
)
user_state = {
"user_id": user.to_string(),
}
user_state.update(**state)
+ if "state" in user_state and "presence" not in user_state:
+ user_state["presence"] = user_state["state"]
yield self.federation.send_edu(
destination=destination,
@@ -618,14 +650,29 @@ class PresenceHandler(BaseHandler):
room_ids = yield rm_handler.get_rooms_for_user(user)
if not observers and not room_ids:
- break
+ continue
state = dict(push)
del state["user_id"]
- if "mtime_age" in state:
- state["mtime"] = int(
- self.clock.time_msec() - state.pop("mtime_age")
+ if "presence" in state:
+ # all is OK
+ pass
+ elif "state" in state:
+ # Legacy handling
+ state["presence"] = state["state"]
+ else:
+ logger.warning("Received a presence 'push' EDU from %s without"
+ + " either a 'presence' or 'state' key", origin
+ )
+ continue
+
+ if "state" in state:
+ del state["state"]
+
+ if "last_active_ago" in state:
+ state["last_active"] = int(
+ self.clock.time_msec() - state.pop("last_active_ago")
)
statuscache = self._get_or_make_usercache(user)
@@ -640,7 +687,7 @@ class PresenceHandler(BaseHandler):
statuscache=statuscache,
)
- if state["state"] == PresenceState.OFFLINE:
+ if state["presence"] == PresenceState.OFFLINE:
del self._user_cachemap[user]
for poll in content.get("poll", []):
@@ -673,10 +720,9 @@ class PresenceHandler(BaseHandler):
yield defer.DeferredList(deferreds)
@defer.inlineCallbacks
- def push_update_to_local_and_remote(self, observed_user,
+ def push_update_to_local_and_remote(self, observed_user, statuscache,
users_to_push=[], room_ids=[],
- remote_domains=[],
- statuscache=None):
+ remote_domains=[]):
localusers, remoteusers = partitionbool(
users_to_push,
@@ -722,6 +768,78 @@ class PresenceHandler(BaseHandler):
)
+class PresenceEventSource(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.clock = hs.get_clock()
+
+ def get_new_events_for_user(self, user, from_key, limit):
+ from_key = int(from_key)
+
+ presence = self.hs.get_handlers().presence_handler
+ cachemap = presence._user_cachemap
+
+ # TODO(paul): limit, and filter by visibility
+ updates = [(k, cachemap[k]) for k in cachemap
+ if from_key < cachemap[k].serial]
+
+ if updates:
+ clock = self.clock
+
+ latest_serial = max([x[1].serial for x in updates])
+ data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
+
+ return ((data, latest_serial))
+ else:
+ return (([], presence._user_cachemap_latest_serial))
+
+ def get_current_key(self):
+ presence = self.hs.get_handlers().presence_handler
+ return presence._user_cachemap_latest_serial
+
+ def get_pagination_rows(self, user, pagination_config, key):
+ # TODO (erikj): Does this make sense? Ordering?
+
+ from_token = pagination_config.from_token
+ to_token = pagination_config.to_token
+
+ from_key = int(from_token.presence_key)
+
+ if to_token:
+ to_key = int(to_token.presence_key)
+ else:
+ to_key = -1
+
+ presence = self.hs.get_handlers().presence_handler
+ cachemap = presence._user_cachemap
+
+ # TODO(paul): limit, and filter by visibility
+ updates = [(k, cachemap[k]) for k in cachemap
+ if to_key < cachemap[k].serial < from_key]
+
+ if updates:
+ clock = self.clock
+
+ earliest_serial = max([x[1].serial for x in updates])
+ data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
+
+ if to_token:
+ next_token = to_token
+ else:
+ next_token = from_token
+
+ next_token = next_token.copy_and_replace(
+ "presence_key", earliest_serial
+ )
+ return ((data, next_token))
+ else:
+ if not to_token:
+ to_token = from_token.copy_and_replace(
+ "presence_key", 0
+ )
+ return (([], to_token))
+
+
class UserPresenceCache(object):
"""Store an observed user's state and status message.
@@ -733,6 +851,7 @@ class UserPresenceCache(object):
def update(self, state, serial):
assert("mtime_age" not in state)
+ assert("state" not in state)
self.state.update(state)
# Delete keys that are now 'None'
@@ -749,15 +868,21 @@ class UserPresenceCache(object):
def get_state(self):
# clone it so caller can't break our cache
- return dict(self.state)
+ state = dict(self.state)
+
+ # Legacy handling
+ if "presence" in state:
+ state["state"] = state["presence"]
+
+ return state
def make_event(self, user, clock):
content = self.get_state()
content["user_id"] = user.to_string()
- if "mtime" in content:
- content["mtime_age"] = int(
- clock.time_msec() - content.pop("mtime")
+ if "last_active" in content:
+ content["last_active_ago"] = int(
+ clock.time_msec() - content.pop("last_active")
)
return {"type": "m.presence", "content": content}
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3e41d7a46b..53aa77405c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -17,10 +17,12 @@
from twisted.internet import defer
from synapse.types import UserID, RoomAlias, RoomID
-from synapse.api.constants import Membership
+from synapse.api.constants import Membership, JoinRules
from synapse.api.errors import StoreError, SynapseError
from synapse.api.events.room import (
- RoomMemberEvent, RoomConfigEvent
+ RoomMemberEvent, RoomCreateEvent, RoomPowerLevelsEvent,
+ RoomJoinRulesEvent, RoomAddStateLevelEvent, RoomTopicEvent,
+ RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent,
)
from synapse.util import stringutils
from ._base import BaseRoomHandler
@@ -62,6 +64,8 @@ class RoomCreationHandler(BaseRoomHandler):
else:
room_alias = None
+ is_public = config.get("visibility", None) == "public"
+
if room_id:
# Ensure room_id is the correct type
room_id_obj = RoomID.from_string(room_id, self.hs)
@@ -71,7 +75,7 @@ class RoomCreationHandler(BaseRoomHandler):
yield self.store.store_room(
room_id=room_id,
room_creator_user_id=user_id,
- is_public=config["visibility"] == "public"
+ is_public=is_public
)
else:
# autogen room IDs and try to create it. We may clash, so just
@@ -85,7 +89,7 @@ class RoomCreationHandler(BaseRoomHandler):
yield self.store.store_room(
room_id=gen_room_id.to_string(),
room_creator_user_id=user_id,
- is_public=config["visibility"] == "public"
+ is_public=is_public
)
room_id = gen_room_id.to_string()
break
@@ -94,18 +98,9 @@ class RoomCreationHandler(BaseRoomHandler):
if not room_id:
raise StoreError(500, "Couldn't generate a room ID.")
- config_event = self.event_factory.create_event(
- etype=RoomConfigEvent.TYPE,
- room_id=room_id,
- user_id=user_id,
- content=config,
- )
-
- snapshot = yield self.store.snapshot_room(
- room_id=room_id,
- user_id=user_id,
- state_type=RoomConfigEvent.TYPE,
- state_key="",
+ user = self.hs.parse_userid(user_id)
+ creation_events = self._create_events_for_new_room(
+ user, room_id, is_public=is_public
)
if room_alias:
@@ -115,11 +110,46 @@ class RoomCreationHandler(BaseRoomHandler):
servers=[self.hs.hostname],
)
- yield self.state_handler.handle_new_event(config_event, snapshot)
- # store_id = persist...
-
federation_handler = self.hs.get_handlers().federation_handler
- yield federation_handler.handle_new_event(config_event, snapshot)
+
+ @defer.inlineCallbacks
+ def handle_event(event):
+ snapshot = yield self.store.snapshot_room(
+ room_id=room_id,
+ user_id=user_id,
+ )
+
+ logger.debug("Event: %s", event)
+
+ yield self.state_handler.handle_new_event(event, snapshot)
+ yield self._on_new_room_event(event, snapshot, extra_users=[user])
+
+ for event in creation_events:
+ yield handle_event(event)
+
+ if "name" in config:
+ name = config["name"]
+ name_event = self.event_factory.create_event(
+ etype=RoomNameEvent.TYPE,
+ room_id=room_id,
+ user_id=user_id,
+ required_power_level=5,
+ content={"name": name},
+ )
+
+ yield handle_event(name_event)
+
+ if "topic" in config:
+ topic = config["topic"]
+ topic_event = self.event_factory.create_event(
+ etype=RoomTopicEvent.TYPE,
+ room_id=room_id,
+ user_id=user_id,
+ required_power_level=5,
+ content={"topic": topic},
+ )
+
+ yield handle_event(topic_event)
content = {"membership": Membership.JOIN}
join_event = self.event_factory.create_event(
@@ -142,6 +172,62 @@ class RoomCreationHandler(BaseRoomHandler):
defer.returnValue(result)
+ def _create_events_for_new_room(self, creator, room_id, is_public=False):
+ event_keys = {
+ "room_id": room_id,
+ "user_id": creator.to_string(),
+ "required_power_level": 10,
+ }
+
+ def create(etype, **content):
+ return self.event_factory.create_event(
+ etype=etype,
+ content=content,
+ **event_keys
+ )
+
+ creation_event = create(
+ etype=RoomCreateEvent.TYPE,
+ creator=creator.to_string(),
+ )
+
+ power_levels_event = self.event_factory.create_event(
+ etype=RoomPowerLevelsEvent.TYPE,
+ content={creator.to_string(): 10, "default": 0},
+ **event_keys
+ )
+
+ join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE
+ join_rules_event = create(
+ etype=RoomJoinRulesEvent.TYPE,
+ join_rule=join_rule,
+ )
+
+ add_state_event = create(
+ etype=RoomAddStateLevelEvent.TYPE,
+ level=10,
+ )
+
+ send_event = create(
+ etype=RoomSendEventLevelEvent.TYPE,
+ level=0,
+ )
+
+ ops = create(
+ etype=RoomOpsPowerLevelsEvent.TYPE,
+ ban_level=5,
+ kick_level=5,
+ )
+
+ return [
+ creation_event,
+ power_levels_event,
+ join_rules_event,
+ add_state_event,
+ send_event,
+ ops,
+ ]
+
class RoomMemberHandler(BaseRoomHandler):
# TODO(paul): This handler currently contains a messy conflation of
@@ -285,6 +371,16 @@ class RoomMemberHandler(BaseRoomHandler):
if do_auth:
yield self.auth.check(event, snapshot, raises=True)
+ # If we're banning someone, set a req power level
+ if event.membership == Membership.BAN:
+ if not hasattr(event, "required_power_level") or event.required_power_level is None:
+ # Add some default required_power_level
+ user_level = yield self.store.get_power_level(
+ event.room_id,
+ event.user_id,
+ )
+ event.required_power_level = user_level
+
if prev_state and prev_state.membership == event.membership:
# double same action, treat this event as a NOOP.
defer.returnValue({})
@@ -445,10 +541,9 @@ class RoomMemberHandler(BaseRoomHandler):
host = target_user.domain
destinations.append(host)
- # If we are joining a remote HS, include that.
- if membership == Membership.JOIN:
- host = target_user.domain
- destinations.append(host)
+ # Always include target domain
+ host = target_user.domain
+ destinations.append(host)
return self._on_new_room_event(
event, snapshot, extra_destinations=destinations,
@@ -462,3 +557,49 @@ class RoomListHandler(BaseRoomHandler):
chunk = yield self.store.get_rooms(is_public=True)
# FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
+
+
+class RoomEventSource(object):
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def get_new_events_for_user(self, user, from_key, limit):
+ # We just ignore the key for now.
+
+ to_key = yield self.get_current_key()
+
+ events, end_key = yield self.store.get_room_events_stream(
+ user_id=user.to_string(),
+ from_key=from_key,
+ to_key=to_key,
+ room_id=None,
+ limit=limit,
+ )
+
+ defer.returnValue((events, end_key))
+
+ def get_current_key(self):
+ return self.store.get_room_events_max_id()
+
+ @defer.inlineCallbacks
+ def get_pagination_rows(self, user, pagination_config, key):
+ from_token = pagination_config.from_token
+ to_token = pagination_config.to_token
+ limit = pagination_config.limit
+ direction = pagination_config.direction
+
+ to_key = to_token.room_key if to_token else None
+
+ events, next_key = yield self.store.paginate_room_events(
+ room_id=key,
+ from_key=from_token.room_key,
+ to_key=to_key,
+ direction=direction,
+ limit=limit,
+ with_feedback=True
+ )
+
+ next_token = from_token.copy_and_replace("room_key", next_key)
+
+ defer.returnValue((events, next_token))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 9fab0ff37c..3268427ecd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -145,3 +145,17 @@ class TypingNotificationHandler(BaseHandler):
typing):
# TODO(paul) steal this from presence.py
pass
+
+
+class TypingNotificationEventSource(object):
+ def __init__(self, hs):
+ self.hs = hs
+
+ def get_new_events_for_user(self, user, from_key, limit):
+ return ([], from_key)
+
+ def get_current_key(self):
+ return 0
+
+ def get_pagination_rows(self, user, pagination_config, key):
+ return ([], pagination_config.from_token)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 36ba2c6591..093bdf0e3f 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -113,8 +113,9 @@ class TwistedHttpClient(HttpClient):
requests.
"""
- def __init__(self):
+ def __init__(self, hs):
self.agent = MatrixHttpAgent(reactor)
+ self.hs = hs
@defer.inlineCallbacks
def put_json(self, destination, path, data):
@@ -177,7 +178,10 @@ class TwistedHttpClient(HttpClient):
retries_left = 5
# TODO: setup and pass in an ssl_context to enable TLS
- endpoint = matrix_endpoint(reactor, destination, timeout=10)
+ endpoint = matrix_endpoint(
+ reactor, destination, timeout=10,
+ ssl_context_factory=self.hs.tls_context_factory
+ )
while True:
try:
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index d91500b07d..a6ebe23567 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -53,7 +53,7 @@ def matrix_endpoint(reactor, destination, ssl_context_factory=None,
default_port = 8080
else:
transport_endpoint = SSL4ClientEndpoint
- endpoint_kw_args.update(ssl_context_factory=ssl_context_factory)
+ endpoint_kw_args.update(sslContextFactory=ssl_context_factory)
default_port = 443
if port is None:
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 66f966fcaa..0b87718bfa 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -325,7 +325,11 @@ class ContentRepoResource(resource.Resource):
# FIXME (erikj): These should use constants.
file_name = os.path.basename(fname)
- url = "http://%s/matrix/content/%s" % (
+ # FIXME: we can't assume what the public mounted path of the repo is
+ # ...plus self-signed SSL won't work to remote clients anyway
+ # ...and we can't assume that it's SSL anyway, as we might want to
+ # server it via the non-SSL listener...
+ url = "https://%s/_matrix/content/%s" % (
self.hs.domain_with_port, file_name
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 253f60983b..3260aa744f 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -95,7 +95,7 @@ class Notifier(object):
"""
room_id = event.room_id
- source = self.event_sources.sources["room"]
+ room_source = self.event_sources.sources["room"]
listeners = self.rooms_to_listeners.get(room_id, set()).copy()
@@ -109,13 +109,17 @@ class Notifier(object):
@defer.inlineCallbacks
def notify(listener):
- events, end_token = yield source.get_new_events_for_user(
+ events, end_key = yield room_source.get_new_events_for_user(
listener.user,
- listener.from_token,
+ listener.from_token.room_key,
listener.limit,
)
if events:
+ end_token = listener.from_token.copy_and_replace(
+ "room_key", end_key
+ )
+
listener.notify(
self, events, listener.from_token, end_token
)
@@ -135,7 +139,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms.
"""
- source = self.event_sources.sources["presence"]
+ presence_source = self.event_sources.sources["presence"]
listeners = set()
@@ -147,13 +151,17 @@ class Notifier(object):
@defer.inlineCallbacks
def notify(listener):
- events, end_token = yield source.get_new_events_for_user(
+ events, end_key = yield presence_source.get_new_events_for_user(
listener.user,
- listener.from_token,
+ listener.from_token.presence_key,
listener.limit,
)
if events:
+ end_token = listener.from_token.copy_and_replace(
+ "presence_key", end_key
+ )
+
listener.notify(
self, events, listener.from_token, end_token
)
@@ -233,16 +241,18 @@ class Notifier(object):
limit = listener.limit
# TODO (erikj): DeferredList?
- for source in self.event_sources.sources.values():
- stuff, new_token = yield source.get_new_events_for_user(
+ for name, source in self.event_sources.sources.items():
+ keyname = "%s_key" % name
+
+ stuff, new_key = yield source.get_new_events_for_user(
listener.user,
- from_token,
+ getattr(from_token, keyname),
limit,
)
events.extend(stuff)
- from_token = new_token
+ from_token = from_token.copy_and_replace(keyname, new_key)
end_token = from_token
diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py
index e013b20853..bce3943542 100644
--- a/synapse/rest/presence.py
+++ b/synapse/rest/presence.py
@@ -48,7 +48,11 @@ class PresenceStatusRestServlet(RestServlet):
try:
content = json.loads(request.content.read())
- state["state"] = content.pop("state")
+ # Legacy handling
+ if "state" in content:
+ state["presence"] = content.pop("state")
+ else:
+ state["presence"] = content.pop("presence")
if "status_msg" in content:
state["status_msg"] = content.pop("status_msg")
diff --git a/synapse/rest/profile.py b/synapse/rest/profile.py
index 2f010c0182..06076667c7 100644
--- a/synapse/rest/profile.py
+++ b/synapse/rest/profile.py
@@ -87,6 +87,27 @@ class ProfileAvatarURLRestServlet(RestServlet):
return (200, {})
+class ProfileRestServlet(RestServlet):
+ PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)")
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, user_id):
+ user = self.hs.parse_userid(user_id)
+
+ displayname = yield self.handlers.profile_handler.get_displayname(
+ user,
+ )
+ avatar_url = yield self.handlers.profile_handler.get_avatar_url(
+ user,
+ )
+
+ defer.returnValue((200, {
+ "displayname": displayname,
+ "avatar_url": avatar_url
+ }))
+
+
def register_servlets(hs, http_server):
ProfileDisplaynameRestServlet(hs).register(http_server)
ProfileAvatarURLRestServlet(hs).register(http_server)
+ ProfileRestServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e8faba3eeb..aadaab06e7 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -19,6 +19,11 @@ from synapse.api.events.room import (
RoomMemberEvent, RoomTopicEvent, FeedbackEvent,
# RoomConfigEvent,
RoomNameEvent,
+ RoomJoinRulesEvent,
+ RoomPowerLevelsEvent,
+ RoomAddStateLevelEvent,
+ RoomSendEventLevelEvent,
+ RoomOpsPowerLevelsEvent,
)
from synapse.util.logutils import log_function
@@ -33,6 +38,7 @@ from .roommember import RoomMemberStore
from .stream import StreamStore
from .pdu import StatePduStore, PduStore
from .transactions import TransactionStore
+from .keys import KeyStore
import json
import logging
@@ -45,7 +51,7 @@ logger = logging.getLogger(__name__)
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, PduStore, StatePduStore, TransactionStore,
- DirectoryStore):
+ DirectoryStore, KeyStore):
def __init__(self, hs):
super(DataStore, self).__init__(hs)
@@ -122,13 +128,21 @@ class DataStore(RoomMemberStore, RoomStore,
if event.type == RoomMemberEvent.TYPE:
self._store_room_member_txn(txn, event)
elif event.type == FeedbackEvent.TYPE:
- self._store_feedback_txn(txn,event)
-# elif event.type == RoomConfigEvent.TYPE:
-# self._store_room_config_txn(txn, event)
+ self._store_feedback_txn(txn, event)
elif event.type == RoomNameEvent.TYPE:
self._store_room_name_txn(txn, event)
elif event.type == RoomTopicEvent.TYPE:
self._store_room_topic_txn(txn, event)
+ elif event.type == RoomJoinRulesEvent.TYPE:
+ self._store_join_rule(txn, event)
+ elif event.type == RoomPowerLevelsEvent.TYPE:
+ self._store_power_levels(txn, event)
+ elif event.type == RoomAddStateLevelEvent.TYPE:
+ self._store_add_state_level(txn, event)
+ elif event.type == RoomSendEventLevelEvent.TYPE:
+ self._store_send_event_level(txn, event)
+ elif event.type == RoomOpsPowerLevelsEvent.TYPE:
+ self._store_ops_level(txn, event)
vals = {
"topological_ordering": event.depth,
@@ -222,7 +236,6 @@ class DataStore(RoomMemberStore, RoomStore,
defer.returnValue(self.min_token)
-
def snapshot_room(self, room_id, user_id, state_type=None, state_key=None):
"""Snapshot the room for an update by a user
Args:
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
new file mode 100644
index 0000000000..4d19b9f641
--- /dev/null
+++ b/synapse/storage/keys.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from _base import SQLBaseStore
+
+from twisted.internet import defer
+
+import OpenSSL
+import nacl.signing
+
+class KeyStore(SQLBaseStore):
+ """Persistence for signature verification keys and tls X.509 certificates
+ """
+
+ @defer.inlineCallbacks
+ def get_server_certificate(self, server_name):
+ """Retrieve the TLS X.509 certificate for the given server
+ Args:
+ server_name (bytes): The name of the server.
+ Returns:
+ (OpenSSL.crypto.X509): The tls certificate.
+ """
+ tls_certificate_bytes, = yield self._simple_select_one(
+ table="server_tls_certificates",
+ keyvalues={"server_name": server_name},
+ retcols=("tls_certificate",),
+ )
+ tls_certificate = OpenSSL.crypto.load_certificate(
+ OpenSSL.crypto.FILETYPE_ASN1, tls_certificate_bytes,
+ )
+ defer.returnValue(tls_certificate)
+
+ def store_server_certificate(self, server_name, key_server, ts_now_ms,
+ tls_certificate):
+ """Stores the TLS X.509 certificate for the given server
+ Args:
+ server_name (bytes): The name of the server.
+ key_server (bytes): Where the certificate was looked up
+ ts_now_ms (int): The time now in milliseconds
+ tls_certificate (OpenSSL.crypto.X509): The X.509 certificate.
+ """
+ tls_certificate_bytes = OpenSSL.crypto.dump_certificate(
+ OpenSSL.crypto.FILETYPE_ASN1, tls_certificate
+ )
+ return self._simple_insert(
+ table="server_tls_certificates",
+ keyvalues={
+ "server_name": server_name,
+ "key_server": key_server,
+ "ts_added_ms": ts_now_ms,
+ "tls_certificate": tls_certificate_bytes,
+ },
+ )
+
+ @defer.inlineCallbacks
+ def get_server_verification_key(self, server_name):
+ """Retrieve the NACL verification key for a given server
+ Args:
+ server_name (bytes): The name of the server.
+ Returns:
+ (nacl.signing.VerifyKey): The verification key.
+ """
+ verification_key_bytes, = yield self._simple_select_one(
+ table="server_signature_keys",
+ key_values={"server_name": server_name},
+ retcols=("tls_certificate",),
+ )
+ verification_key = nacl.signing.VerifyKey(verification_key_bytes)
+ defer.returnValue(verification_key)
+
+ def store_server_verification_key(self, server_name, key_version,
+ key_server, ts_now_ms, verification_key):
+ """Stores a NACL verification key for the given server.
+ Args:
+ server_name (bytes): The name of the server.
+ key_version (bytes): The version of the key for the server.
+ key_server (bytes): Where the verification key was looked up
+ ts_now_ms (int): The time now in milliseconds
+ verification_key (nacl.signing.VerifyKey): The NACL verify key.
+ """
+ verification_key_bytes = verification_key.encode()
+ return self._simple_insert(
+ table="server_signature_keys",
+ key_values={
+ "server_name": server_name,
+ "key_version": key_version,
+ "key_server": key_server,
+ "ts_added_ms": ts_now_ms,
+ "verification_key": verification_key_bytes,
+ },
+ )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index d1f1a232f8..01ae190316 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -27,6 +27,9 @@ import logging
logger = logging.getLogger(__name__)
+OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level"))
+
+
class RoomStore(SQLBaseStore):
@defer.inlineCallbacks
@@ -129,6 +132,98 @@ class RoomStore(SQLBaseStore):
defer.returnValue(ret)
+ @defer.inlineCallbacks
+ def get_room_join_rule(self, room_id):
+ sql = (
+ "SELECT join_rule FROM room_join_rules as r "
+ "INNER JOIN current_state_events as c "
+ "ON r.event_id = c.event_id "
+ "WHERE c.room_id = ? "
+ )
+
+ rows = yield self._execute(None, sql, room_id)
+
+ if len(rows) == 1:
+ defer.returnValue(rows[0][0])
+ else:
+ defer.returnValue(None)
+
+ def get_power_level(self, room_id, user_id):
+ return self._db_pool.runInteraction(
+ self._get_power_level,
+ room_id, user_id,
+ )
+
+ def _get_power_level(self, txn, room_id, user_id):
+ sql = (
+ "SELECT level FROM room_power_levels as r "
+ "INNER JOIN current_state_events as c "
+ "ON r.event_id = c.event_id "
+ "WHERE c.room_id = ? AND r.user_id = ? "
+ )
+
+ rows = txn.execute(sql, (room_id, user_id,)).fetchall()
+
+ if len(rows) == 1:
+ return rows[0][0]
+
+ sql = (
+ "SELECT level FROM room_default_levels as r "
+ "INNER JOIN current_state_events as c "
+ "ON r.event_id = c.event_id "
+ "WHERE c.room_id = ? "
+ )
+
+ rows = txn.execute(sql, (room_id,)).fetchall()
+
+ if len(rows) == 1:
+ return rows[0][0]
+ else:
+ return None
+
+ def get_ops_levels(self, room_id):
+ return self._db_pool.runInteraction(
+ self._get_ops_levels,
+ room_id,
+ )
+
+ def _get_ops_levels(self, txn, room_id):
+ sql = (
+ "SELECT ban_level, kick_level FROM room_ops_levels as r "
+ "INNER JOIN current_state_events as c "
+ "ON r.event_id = c.event_id "
+ "WHERE c.room_id = ? "
+ )
+
+ rows = txn.execute(sql, (room_id,)).fetchall()
+
+ if len(rows) == 1:
+ return OpsLevel(rows[0][0], rows[0][1])
+ else:
+ return OpsLevel(None, None)
+
+ def get_add_state_level(self, room_id):
+ return self._get_level_from_table("room_add_state_levels", room_id)
+
+ def get_send_event_level(self, room_id):
+ return self._get_level_from_table("room_send_event_levels", room_id)
+
+ @defer.inlineCallbacks
+ def _get_level_from_table(self, table, room_id):
+ sql = (
+ "SELECT level FROM %(table)s as r "
+ "INNER JOIN current_state_events as c "
+ "ON r.event_id = c.event_id "
+ "WHERE c.room_id = ? "
+ ) % {"table": table}
+
+ rows = yield self._execute(None, sql, room_id)
+
+ if len(rows) == 1:
+ defer.returnValue(rows[0][0])
+ else:
+ defer.returnValue(None)
+
def _store_room_topic_txn(self, txn, event):
self._simple_insert_txn(
txn,
@@ -151,6 +246,92 @@ class RoomStore(SQLBaseStore):
}
)
+ def _store_join_rule(self, txn, event):
+ self._simple_insert_txn(
+ txn,
+ "room_join_rules",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "join_rule": event.content["join_rule"],
+ },
+ )
+
+ def _store_power_levels(self, txn, event):
+ for user_id, level in event.content.items():
+ if user_id == "default":
+ self._simple_insert_txn(
+ txn,
+ "room_default_levels",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "level": level,
+ },
+ )
+ else:
+ self._simple_insert_txn(
+ txn,
+ "room_power_levels",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "user_id": user_id,
+ "level": level
+ },
+ )
+
+ def _store_default_level(self, txn, event):
+ self._simple_insert_txn(
+ txn,
+ "room_default_levels",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "level": event.content["default_level"],
+ },
+ )
+
+ def _store_add_state_level(self, txn, event):
+ self._simple_insert_txn(
+ txn,
+ "room_add_state_levels",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "level": event.content["level"],
+ },
+ )
+
+ def _store_send_event_level(self, txn, event):
+ self._simple_insert_txn(
+ txn,
+ "room_send_event_levels",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "level": event.content["level"],
+ },
+ )
+
+ def _store_ops_level(self, txn, event):
+ content = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ }
+
+ if "kick_level" in event.content:
+ content["kick_level"] = event.content["kick_level"]
+
+ if "ban_level" in event.content:
+ content["ban_level"] = event.content["ban_level"]
+
+ self._simple_insert_txn(
+ txn,
+ "room_ops_levels",
+ content,
+ )
+
class RoomsTable(Table):
table_name = "rooms"
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index e92f21ef3b..dbefbbda31 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -96,8 +96,71 @@ CREATE TABLE IF NOT EXISTS rooms(
creator TEXT
);
+CREATE TABLE IF NOT EXISTS room_join_rules(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ join_rule TEXT NOT NULL
+);
+CREATE INDEX IF NOT EXISTS room_join_rules_event_id ON room_join_rules(event_id);
+CREATE INDEX IF NOT EXISTS room_join_rules_room_id ON room_join_rules(room_id);
+
+
+CREATE TABLE IF NOT EXISTS room_power_levels(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ level INTEGER NOT NULL
+);
+CREATE INDEX IF NOT EXISTS room_power_levels_event_id ON room_power_levels(event_id);
+CREATE INDEX IF NOT EXISTS room_power_levels_room_id ON room_power_levels(room_id);
+CREATE INDEX IF NOT EXISTS room_power_levels_room_user ON room_power_levels(room_id, user_id);
+
+
+CREATE TABLE IF NOT EXISTS room_default_levels(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ level INTEGER NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS room_default_levels_event_id ON room_default_levels(event_id);
+CREATE INDEX IF NOT EXISTS room_default_levels_room_id ON room_default_levels(room_id);
+
+
+CREATE TABLE IF NOT EXISTS room_add_state_levels(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ level INTEGER NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS room_add_state_levels_event_id ON room_add_state_levels(event_id);
+CREATE INDEX IF NOT EXISTS room_add_state_levels_room_id ON room_add_state_levels(room_id);
+
+
+CREATE TABLE IF NOT EXISTS room_send_event_levels(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ level INTEGER NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS room_send_event_levels_event_id ON room_send_event_levels(event_id);
+CREATE INDEX IF NOT EXISTS room_send_event_levels_room_id ON room_send_event_levels(room_id);
+
+
+CREATE TABLE IF NOT EXISTS room_ops_levels(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ ban_level INTEGER,
+ kick_level INTEGER
+);
+
+CREATE INDEX IF NOT EXISTS room_ops_levels_event_id ON room_ops_levels(event_id);
+CREATE INDEX IF NOT EXISTS room_ops_levels_room_id ON room_ops_levels(room_id);
+
+
CREATE TABLE IF NOT EXISTS room_hosts(
room_id TEXT NOT NULL,
host TEXT NOT NULL,
CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
);
+
+CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/keys.sql b/synapse/storage/schema/keys.sql
new file mode 100644
index 0000000000..45cdbcecae
--- /dev/null
+++ b/synapse/storage/schema/keys.sql
@@ -0,0 +1,30 @@
+/* Copyright 2014 matrix.org
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS server_tls_certificates(
+ server_name TEXT, -- Server name.
+ key_server TEXT, -- Which key server the certificate was fetched from.
+ ts_added_ms INTEGER, -- When the certifcate was added.
+ tls_certificate BLOB, -- DER encoded x509 certificate.
+ CONSTRAINT uniqueness UNIQUE (server_name)
+);
+
+CREATE TABLE IF NOT EXISTS server_signature_keys(
+ server_name TEXT, -- Server name.
+ key_version TEXT, -- Key version.
+ key_server TEXT, -- Which key server the key was fetched form.
+ ts_added_ms INTEGER, -- When the key was added.
+ verification_key BLOB, -- NACL verification key.
+ CONSTRAINT uniqueness UNIQUE (server_name, key_version)
+);
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index c68cf1a59c..08d6e6f733 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -17,6 +17,10 @@ from twisted.internet import defer
from synapse.types import StreamToken
+from synapse.handlers.presence import PresenceEventSource
+from synapse.handlers.room import RoomEventSource
+from synapse.handlers.typing import TypingNotificationEventSource
+
class NullSource(object):
"""This event source never yields any events and its token remains at
@@ -24,146 +28,21 @@ class NullSource(object):
def __init__(self, hs):
pass
- def get_new_events_for_user(self, user, from_token, limit):
- return defer.succeed(([], from_token))
+ def get_new_events_for_user(self, user, from_key, limit):
+ return defer.succeed(([], from_key))
- def get_current_token_part(self):
+ def get_current_key(self):
return defer.succeed(0)
def get_pagination_rows(self, user, pagination_config, key):
return defer.succeed(([], pagination_config.from_token))
-class RoomEventSource(object):
- def __init__(self, hs):
- self.store = hs.get_datastore()
-
- @defer.inlineCallbacks
- def get_new_events_for_user(self, user, from_token, limit):
- # We just ignore the key for now.
-
- to_key = yield self.get_current_token_part()
-
- events, end_key = yield self.store.get_room_events_stream(
- user_id=user.to_string(),
- from_key=from_token.events_key,
- to_key=to_key,
- room_id=None,
- limit=limit,
- )
-
- end_token = from_token.copy_and_replace("events_key", end_key)
-
- defer.returnValue((events, end_token))
-
- def get_current_token_part(self):
- return self.store.get_room_events_max_id()
-
- @defer.inlineCallbacks
- def get_pagination_rows(self, user, pagination_config, key):
- from_token = pagination_config.from_token
- to_token = pagination_config.to_token
- limit = pagination_config.limit
- direction = pagination_config.direction
-
- to_key = to_token.events_key if to_token else None
-
- events, next_key = yield self.store.paginate_room_events(
- room_id=key,
- from_key=from_token.events_key,
- to_key=to_key,
- direction=direction,
- limit=limit,
- with_feedback=True
- )
-
- next_token = from_token.copy_and_replace("events_key", next_key)
-
- defer.returnValue((events, next_token))
-
-
-class PresenceSource(object):
- def __init__(self, hs):
- self.hs = hs
- self.clock = hs.get_clock()
-
- def get_new_events_for_user(self, user, from_token, limit):
- from_key = int(from_token.presence_key)
-
- presence = self.hs.get_handlers().presence_handler
- cachemap = presence._user_cachemap
-
- # TODO(paul): limit, and filter by visibility
- updates = [(k, cachemap[k]) for k in cachemap
- if from_key < cachemap[k].serial]
-
- if updates:
- clock = self.clock
-
- latest_serial = max([x[1].serial for x in updates])
- data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
-
- end_token = from_token.copy_and_replace(
- "presence_key", latest_serial
- )
- return ((data, end_token))
- else:
- end_token = from_token.copy_and_replace(
- "presence_key", presence._user_cachemap_latest_serial
- )
- return (([], end_token))
-
- def get_current_token_part(self):
- presence = self.hs.get_handlers().presence_handler
- return presence._user_cachemap_latest_serial
-
- def get_pagination_rows(self, user, pagination_config, key):
- # TODO (erikj): Does this make sense? Ordering?
-
- from_token = pagination_config.from_token
- to_token = pagination_config.to_token
-
- from_key = int(from_token.presence_key)
-
- if to_token:
- to_key = int(to_token.presence_key)
- else:
- to_key = -1
-
- presence = self.hs.get_handlers().presence_handler
- cachemap = presence._user_cachemap
-
- # TODO(paul): limit, and filter by visibility
- updates = [(k, cachemap[k]) for k in cachemap
- if to_key < cachemap[k].serial < from_key]
-
- if updates:
- clock = self.clock
-
- earliest_serial = max([x[1].serial for x in updates])
- data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
-
- if to_token:
- next_token = to_token
- else:
- next_token = from_token
-
- next_token = next_token.copy_and_replace(
- "presence_key", earliest_serial
- )
- return ((data, next_token))
- else:
- if not to_token:
- to_token = from_token.copy_and_replace(
- "presence_key", 0
- )
- return (([], to_token))
-
-
class EventSources(object):
SOURCE_TYPES = {
"room": RoomEventSource,
- "presence": PresenceSource,
+ "presence": PresenceEventSource,
+ "typing": TypingNotificationEventSource,
}
def __init__(self, hs):
@@ -172,24 +51,29 @@ class EventSources(object):
for name, cls in EventSources.SOURCE_TYPES.items()
}
- @staticmethod
- def create_token(events_key, presence_key):
- return StreamToken(events_key=events_key, presence_key=presence_key)
-
@defer.inlineCallbacks
def get_current_token(self):
- events_key = yield self.sources["room"].get_current_token_part()
- presence_key = yield self.sources["presence"].get_current_token_part()
- token = EventSources.create_token(events_key, presence_key)
+ token = StreamToken(
+ room_key=(
+ yield self.sources["room"].get_current_key()
+ ),
+ presence_key=(
+ yield self.sources["presence"].get_current_key()
+ ),
+ typing_key=(
+ yield self.sources["typing"].get_current_key()
+ )
+ )
defer.returnValue(token)
class StreamSource(object):
- def get_new_events_for_user(self, user, from_token, limit):
+ def get_new_events_for_user(self, user, from_key, limit):
+ """from_key is the key within this event source."""
raise NotImplementedError("get_new_events_for_user")
- def get_current_token_part(self):
- raise NotImplementedError("get_current_token_part")
+ def get_current_key(self):
+ raise NotImplementedError("get_current_key")
def get_pagination_rows(self, user, pagination_config, key):
raise NotImplementedError("get_rows")
diff --git a/synapse/types.py b/synapse/types.py
index 63154855dd..1a9dceabf5 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -97,7 +97,7 @@ class RoomID(DomainSpecificString):
class StreamToken(
namedtuple(
"Token",
- ("events_key", "presence_key")
+ ("room_key", "presence_key", "typing_key")
)
):
_SEPARATOR = "_"
@@ -105,21 +105,14 @@ class StreamToken(
@classmethod
def from_string(cls, string):
try:
- events_key, presence_key = string.split(cls._SEPARATOR)
+ keys = string.split(cls._SEPARATOR)
- return cls(
- events_key=events_key,
- presence_key=presence_key,
- )
+ return cls(*keys)
except:
raise SynapseError(400, "Invalid Token")
def to_string(self):
- return "".join([
- str(self.events_key),
- self._SEPARATOR,
- str(self.presence_key),
- ])
+ return self._SEPARATOR.join([str(k) for k in self])
def copy_and_replace(self, key, new_value):
d = self._asdict()
|