summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/room_versions.py4
-rw-r--r--synapse/config/server.py33
-rw-r--r--synapse/crypto/keyring.py11
-rw-r--r--synapse/events/builder.py7
-rw-r--r--synapse/events/utils.py5
-rw-r--r--synapse/handlers/events.py3
-rw-r--r--synapse/handlers/message.py3
-rw-r--r--synapse/handlers/presence.py99
-rw-r--r--synapse/handlers/room.py9
-rw-r--r--synapse/http/servlet.py2
-rw-r--r--synapse/rest/client/v2_alpha/capabilities.py5
-rw-r--r--synapse/rest/client/v2_alpha/sync.py3
-rw-r--r--synapse/storage/schema/delta/54/account_validity_with_renewal.sql (renamed from synapse/storage/schema/delta/54/account_validity.sql)3
-rw-r--r--synapse/storage/stats.py2
14 files changed, 117 insertions, 72 deletions
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index b2895355a8..4085bd10b9 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -85,10 +85,6 @@ class RoomVersions(object):
     )
 
 
-# the version we will give rooms which are created on this server
-DEFAULT_ROOM_VERSION = RoomVersions.V1
-
-
 KNOWN_ROOM_VERSIONS = {
     v.identifier: v for v in (
         RoomVersions.V1,
diff --git a/synapse/config/server.py b/synapse/config/server.py
index f34aa42afa..e763e19e15 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -20,6 +20,7 @@ import os.path
 
 from netaddr import IPSet
 
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.python_dependencies import DependencyException, check_requirements
 
@@ -35,6 +36,8 @@ logger = logging.Logger(__name__)
 # in the list.
 DEFAULT_BIND_ADDRESSES = ['::', '0.0.0.0']
 
+DEFAULT_ROOM_VERSION = "1"
+
 
 class ServerConfig(Config):
 
@@ -88,6 +91,22 @@ class ServerConfig(Config):
             "restrict_public_rooms_to_local_users", False,
         )
 
+        default_room_version = config.get(
+            "default_room_version", DEFAULT_ROOM_VERSION,
+        )
+
+        # Ensure room version is a str
+        default_room_version = str(default_room_version)
+
+        if default_room_version not in KNOWN_ROOM_VERSIONS:
+            raise ConfigError(
+                "Unknown default_room_version: %s, known room versions: %s" %
+                (default_room_version, list(KNOWN_ROOM_VERSIONS.keys()))
+            )
+
+        # Get the actual room version object rather than just the identifier
+        self.default_room_version = KNOWN_ROOM_VERSIONS[default_room_version]
+
         # whether to enable search. If disabled, new entries will not be inserted
         # into the search tables and they will not be indexed. Users will receive
         # errors when attempting to search for messages.
@@ -310,6 +329,10 @@ class ServerConfig(Config):
             unsecure_port = 8008
 
         pid_file = os.path.join(data_dir_path, "homeserver.pid")
+
+        # Bring DEFAULT_ROOM_VERSION into the local-scope for use in the
+        # default config string
+        default_room_version = DEFAULT_ROOM_VERSION
         return """\
         ## Server ##
 
@@ -384,6 +407,16 @@ class ServerConfig(Config):
         #
         #restrict_public_rooms_to_local_users: true
 
+        # The default room version for newly created rooms.
+        #
+        # Known room versions are listed here:
+        # https://matrix.org/docs/spec/#complete-list-of-room-versions
+        #
+        # For example, for room version 1, default_room_version should be set
+        # to "1".
+        #
+        #default_room_version: "%(default_room_version)s"
+
         # The GC threshold parameters to pass to `gc.set_threshold`, if defined
         #
         #gc_thresholds: [700, 10, 10]
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 65af2fb671..c63f106cf3 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -181,9 +181,7 @@ class Keyring(object):
 
             # We want to wait for any previous lookups to complete before
             # proceeding.
-            yield self.wait_for_previous_lookups(
-                [rq.server_name for rq in verify_requests], server_to_deferred
-            )
+            yield self.wait_for_previous_lookups(server_to_deferred)
 
             # Actually start fetching keys.
             self._get_server_verify_keys(verify_requests)
@@ -216,12 +214,11 @@ class Keyring(object):
             logger.exception("Error starting key lookups")
 
     @defer.inlineCallbacks
-    def wait_for_previous_lookups(self, server_names, server_to_deferred):
+    def wait_for_previous_lookups(self, server_to_deferred):
         """Waits for any previous key lookups for the given servers to finish.
 
         Args:
-            server_names (list): list of server_names we want to lookup
-            server_to_deferred (dict): server_name to deferred which gets
+            server_to_deferred (dict[str, Deferred]): server_name to deferred which gets
                 resolved once we've finished looking up keys for that server.
                 The Deferreds should be regular twisted ones which call their
                 callbacks with no logcontext.
@@ -234,7 +231,7 @@ class Keyring(object):
         while True:
             wait_on = [
                 (server_name, self.key_downloads[server_name])
-                for server_name in server_names
+                for server_name in server_to_deferred.keys()
                 if server_name in self.key_downloads
             ]
             if not wait_on:
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 1fe995f212..546b6f4982 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -76,6 +76,7 @@ class EventBuilder(object):
     # someone tries to get them when they don't exist.
     _state_key = attr.ib(default=None)
     _redacts = attr.ib(default=None)
+    _origin_server_ts = attr.ib(default=None)
 
     internal_metadata = attr.ib(default=attr.Factory(lambda: _EventInternalMetadata({})))
 
@@ -142,6 +143,9 @@ class EventBuilder(object):
         if self._redacts is not None:
             event_dict["redacts"] = self._redacts
 
+        if self._origin_server_ts is not None:
+            event_dict["origin_server_ts"] = self._origin_server_ts
+
         defer.returnValue(
             create_local_event_from_event_dict(
                 clock=self._clock,
@@ -209,6 +213,7 @@ class EventBuilderFactory(object):
             content=key_values.get("content", {}),
             unsigned=key_values.get("unsigned", {}),
             redacts=key_values.get("redacts", None),
+            origin_server_ts=key_values.get("origin_server_ts", None),
         )
 
 
@@ -245,7 +250,7 @@ def create_local_event_from_event_dict(clock, hostname, signing_key,
         event_dict["event_id"] = _create_event_id(clock, hostname)
 
     event_dict["origin"] = hostname
-    event_dict["origin_server_ts"] = time_now
+    event_dict.setdefault("origin_server_ts", time_now)
 
     event_dict.setdefault("unsigned", {})
     age = event_dict["unsigned"].pop("age", 0)
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 27a2a9ef98..e2d4384de1 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -330,12 +330,13 @@ class EventClientSerializer(object):
         )
 
     @defer.inlineCallbacks
-    def serialize_event(self, event, time_now, **kwargs):
+    def serialize_event(self, event, time_now, bundle_aggregations=True, **kwargs):
         """Serializes a single event.
 
         Args:
             event (EventBase)
             time_now (int): The current time in milliseconds
+            bundle_aggregations (bool): Whether to bundle in related events
             **kwargs: Arguments to pass to `serialize_event`
 
         Returns:
@@ -350,7 +351,7 @@ class EventClientSerializer(object):
 
         # If MSC1849 is enabled then we need to look if thre are any relations
         # we need to bundle in with the event
-        if self.experimental_msc1849_support_enabled:
+        if self.experimental_msc1849_support_enabled and bundle_aggregations:
             annotations = yield self.store.get_aggregation_groups_for_event(
                 event_id,
             )
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 6003ad9cca..eb525070cf 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -122,6 +122,9 @@ class EventStreamHandler(BaseHandler):
 
             chunks = yield self._event_serializer.serialize_events(
                 events, time_now, as_client_event=as_client_event,
+                # We don't bundle "live" events, as otherwise clients
+                # will end up double counting annotations.
+                bundle_aggregations=False,
             )
 
             chunk = {
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 792edc7579..0b02469ceb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -166,6 +166,9 @@ class MessageHandler(object):
         now = self.clock.time_msec()
         events = yield self._event_serializer.serialize_events(
             room_state.values(), now,
+            # We don't bother bundling aggregations in when asked for state
+            # events, as clients won't use them.
+            bundle_aggregations=False,
         )
         defer.returnValue(events)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 59d53f1050..6209858bbb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -182,17 +182,27 @@ class PresenceHandler(object):
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
+        def run_timeout_handler():
+            return run_as_background_process(
+                "handle_presence_timeouts", self._handle_timeouts
+            )
+
         self.clock.call_later(
             30,
             self.clock.looping_call,
-            self._handle_timeouts,
+            run_timeout_handler,
             5000,
         )
 
+        def run_persister():
+            return run_as_background_process(
+                "persist_presence_changes", self._persist_unpersisted_changes
+            )
+
         self.clock.call_later(
             60,
             self.clock.looping_call,
-            self._persist_unpersisted_changes,
+            run_persister,
             60 * 1000,
         )
 
@@ -229,6 +239,7 @@ class PresenceHandler(object):
         )
 
         if self.unpersisted_users_changes:
+
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in self.unpersisted_users_changes
@@ -240,30 +251,18 @@ class PresenceHandler(object):
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
         """
-        logger.info(
-            "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
-            len(self.unpersisted_users_changes)
-        )
-
         unpersisted = self.unpersisted_users_changes
         self.unpersisted_users_changes = set()
 
         if unpersisted:
+            logger.info(
+                "Persisting %d upersisted presence updates", len(unpersisted)
+            )
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in unpersisted
             ])
 
-        logger.info("Finished _persist_unpersisted_changes")
-
-    @defer.inlineCallbacks
-    def _update_states_and_catch_exception(self, new_states):
-        try:
-            res = yield self._update_states(new_states)
-            defer.returnValue(res)
-        except Exception:
-            logger.exception("Error updating presence")
-
     @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +337,41 @@ class PresenceHandler(object):
         logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        try:
-            with Measure(self.clock, "presence_handle_timeouts"):
-                # Fetch the list of users that *may* have timed out. Things may have
-                # changed since the timeout was set, so we won't necessarily have to
-                # take any action.
-                users_to_check = set(self.wheel_timer.fetch(now))
-
-                # Check whether the lists of syncing processes from an external
-                # process have expired.
-                expired_process_ids = [
-                    process_id for process_id, last_update
-                    in self.external_process_last_updated_ms.items()
-                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
-                ]
-                for process_id in expired_process_ids:
-                    users_to_check.update(
-                        self.external_process_last_updated_ms.pop(process_id, ())
-                    )
-                    self.external_process_last_update.pop(process_id)
+        # Fetch the list of users that *may* have timed out. Things may have
+        # changed since the timeout was set, so we won't necessarily have to
+        # take any action.
+        users_to_check = set(self.wheel_timer.fetch(now))
+
+        # Check whether the lists of syncing processes from an external
+        # process have expired.
+        expired_process_ids = [
+            process_id for process_id, last_update
+            in self.external_process_last_updated_ms.items()
+            if now - last_update > EXTERNAL_PROCESS_EXPIRY
+        ]
+        for process_id in expired_process_ids:
+            users_to_check.update(
+                self.external_process_last_updated_ms.pop(process_id, ())
+            )
+            self.external_process_last_update.pop(process_id)
 
-                states = [
-                    self.user_to_current_state.get(
-                        user_id, UserPresenceState.default(user_id)
-                    )
-                    for user_id in users_to_check
-                ]
+        states = [
+            self.user_to_current_state.get(
+                user_id, UserPresenceState.default(user_id)
+            )
+            for user_id in users_to_check
+        ]
 
-                timers_fired_counter.inc(len(states))
+        timers_fired_counter.inc(len(states))
 
-                changes = handle_timeouts(
-                    states,
-                    is_mine_fn=self.is_mine_id,
-                    syncing_user_ids=self.get_currently_syncing_users(),
-                    now=now,
-                )
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.is_mine_id,
+            syncing_user_ids=self.get_currently_syncing_users(),
+            now=now,
+        )
 
-            run_in_background(self._update_states_and_catch_exception, changes)
-        except Exception:
-            logger.exception("Exception in _handle_timeouts loop")
+        return self._update_states(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e37ae96899..4a17911a87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -70,6 +70,7 @@ class RoomCreationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
+        self.config = hs.config
 
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -475,7 +476,11 @@ class RoomCreationHandler(BaseHandler):
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
+        room_version = config.get(
+            "room_version",
+            self.config.default_room_version.identifier,
+        )
+
         if not isinstance(room_version, string_types):
             raise SynapseError(
                 400,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 528125e737..197c652850 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -55,7 +55,7 @@ def parse_integer_from_args(args, name, default=None, required=False):
             return int(args[name][0])
         except Exception:
             message = "Query parameter %r must be an integer" % (name,)
-            raise SynapseError(400, message)
+            raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
     else:
         if required:
             message = "Missing integer query parameter %r" % (name,)
diff --git a/synapse/rest/client/v2_alpha/capabilities.py b/synapse/rest/client/v2_alpha/capabilities.py
index a868d06098..2b4892330c 100644
--- a/synapse/rest/client/v2_alpha/capabilities.py
+++ b/synapse/rest/client/v2_alpha/capabilities.py
@@ -16,7 +16,7 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.http.servlet import RestServlet
 
 from ._base import client_v2_patterns
@@ -36,6 +36,7 @@ class CapabilitiesRestServlet(RestServlet):
         """
         super(CapabilitiesRestServlet, self).__init__()
         self.hs = hs
+        self.config = hs.config
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
 
@@ -48,7 +49,7 @@ class CapabilitiesRestServlet(RestServlet):
         response = {
             "capabilities": {
                 "m.room_versions": {
-                    "default": DEFAULT_ROOM_VERSION.identifier,
+                    "default": self.config.default_room_version.identifier,
                     "available": {
                         v.identifier: v.disposition
                         for v in KNOWN_ROOM_VERSIONS.values()
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index c701e534e7..d3025025e3 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -358,6 +358,9 @@ class SyncRestServlet(RestServlet):
         def serialize(events):
             return self._event_serializer.serialize_events(
                 events, time_now=time_now,
+                # We don't bundle "live" events, as otherwise clients
+                # will end up double counting annotations.
+                bundle_aggregations=False,
                 token_id=token_id,
                 event_format=event_formatter,
                 only_event_fields=only_fields,
diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
index 2357626000..0adb2ad55e 100644
--- a/synapse/storage/schema/delta/54/account_validity.sql
+++ b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
@@ -13,6 +13,9 @@
  * limitations under the License.
  */
 
+-- We previously changed the schema for this table without renaming the file, which means
+-- that some databases might still be using the old schema. This ensures Synapse uses the
+-- right schema for the table.
 DROP TABLE IF EXISTS account_validity;
 
 -- Track what users are in public rooms.
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 71b80a891d..eb0ced5b5e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -169,7 +169,7 @@ class StatsStore(StateDeltasStore):
 
         logger.info(
             "Processing the next %d rooms of %d remaining",
-            (len(rooms_to_work_on), progress["remaining"]),
+            len(rooms_to_work_on), progress["remaining"],
         )
 
         # Number of state events we've processed by going through each room