summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-02-27 10:29:24 +0000
committerRichard van der Hoff <richard@matrix.org>2019-02-27 10:29:24 +0000
commit67acd1aa1b931291a011b95488a1fb059fc8f644 (patch)
tree77b543bc3b83c79313e75fdd593f493810f9c6a6
parentMerge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes (diff)
parentLimit cache invalidation replication line length (#4748) (diff)
downloadsynapse-67acd1aa1b931291a011b95488a1fb059fc8f644.tar.xz
Merge branch 'develop' into matrix-org-hotfixes
-rw-r--r--changelog.d/4698.misc2
-rw-r--r--changelog.d/4746.feature1
-rw-r--r--changelog.d/4748.misc1
-rw-r--r--changelog.d/4750.misc1
-rw-r--r--changelog.d/4753.misc1
-rw-r--r--docs/reverse_proxy.rst2
-rwxr-xr-xscripts-dev/check-newsfragment11
-rwxr-xr-xsynapse/app/homeserver.py3
-rw-r--r--synapse/federation/transport/server.py3
-rw-r--r--synapse/groups/groups_server.py6
-rw-r--r--synapse/handlers/room_list.py68
-rw-r--r--synapse/replication/tcp/protocol.py17
-rw-r--r--synapse/storage/_base.py23
-rw-r--r--synapse/storage/engines/postgres.py25
-rw-r--r--synapse/storage/engines/sqlite.py9
15 files changed, 150 insertions, 23 deletions
diff --git a/changelog.d/4698.misc b/changelog.d/4698.misc
index d17b19bec5..9dea5dd2be 100644
--- a/changelog.d/4698.misc
+++ b/changelog.d/4698.misc
@@ -1 +1 @@
-Better checks on newsfragments
+Better checks on newsfragments.
diff --git a/changelog.d/4746.feature b/changelog.d/4746.feature
new file mode 100644
index 0000000000..97c253eccf
--- /dev/null
+++ b/changelog.d/4746.feature
@@ -0,0 +1 @@
+Prevent showing rooms to other servers that were set to not federate.
\ No newline at end of file
diff --git a/changelog.d/4748.misc b/changelog.d/4748.misc
new file mode 100644
index 0000000000..4dc18378e7
--- /dev/null
+++ b/changelog.d/4748.misc
@@ -0,0 +1 @@
+Improve replication performance by reducing cache invalidation traffic.
diff --git a/changelog.d/4750.misc b/changelog.d/4750.misc
new file mode 100644
index 0000000000..3bb9c48f1a
--- /dev/null
+++ b/changelog.d/4750.misc
@@ -0,0 +1 @@
+Better checks on newsfragments.
\ No newline at end of file
diff --git a/changelog.d/4753.misc b/changelog.d/4753.misc
new file mode 100644
index 0000000000..98532cc971
--- /dev/null
+++ b/changelog.d/4753.misc
@@ -0,0 +1 @@
+Add database version to phonehome stats.
diff --git a/docs/reverse_proxy.rst b/docs/reverse_proxy.rst
index 242935a62f..4706061eba 100644
--- a/docs/reverse_proxy.rst
+++ b/docs/reverse_proxy.rst
@@ -79,7 +79,7 @@ Let's assume that we expect clients to connect to our server at
           SSLEngine on
           ServerName example.com;
 
-          <Location />
+          <Location /_matrix>
               ProxyPass http://127.0.0.1:8008/_matrix nocanon
               ProxyPassReverse http://127.0.0.1:8008/_matrix
           </Location>
diff --git a/scripts-dev/check-newsfragment b/scripts-dev/check-newsfragment
index 5da093e168..e4a22bae61 100755
--- a/scripts-dev/check-newsfragment
+++ b/scripts-dev/check-newsfragment
@@ -6,7 +6,8 @@
 set -e
 
 # make sure that origin/develop is up to date
-git fetch origin develop
+git remote set-branches --add origin develop
+git fetch --depth=1 origin develop
 
 UPSTREAM=origin/develop
 
@@ -25,11 +26,15 @@ if git diff --name-only $UPSTREAM... | grep -qv '^develop/'; then
     tox -e check-newsfragment
 fi
 
+echo
+echo "--------------------------"
+echo
+
 # check that any new newsfiles on this branch end with a full stop.
-for f in git diff --name-only $UPSTREAM... -- changelog.d; do
+for f in `git diff --name-only $UPSTREAM... -- changelog.d`; do
     lastchar=`tr -d '\n' < $f | tail -c 1`
     if [ $lastchar != '.' ]; then
-        echo "Newsfragment $f does not end with a '.'" >&2
+        echo -e "\e[31mERROR: newsfragment $f does not end with a '.'\e[39m" >&2
         exit 1
     fi
 done
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 05a97979ec..e8b6cc3114 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -555,6 +555,9 @@ def run(hs):
                 stats["memory_rss"] += process.memory_info().rss
                 stats["cpu_average"] += int(process.cpu_percent(interval=None))
 
+        stats["database_engine"] = hs.get_datastore().database_engine_name
+        stats["database_server_version"] = hs.get_datastore().get_server_version()
+
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
             yield hs.get_simple_http_client().put_json(
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a2396ab466..5ba94be2ec 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -736,7 +736,8 @@ class PublicRoomList(BaseFederationServlet):
 
         data = yield self.handler.get_local_public_room_list(
             limit, since_token,
-            network_tuple=network_tuple
+            network_tuple=network_tuple,
+            from_federation=True,
         )
         defer.returnValue((200, data))
 
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 633c865ed8..a7eaead56b 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -113,8 +113,7 @@ class GroupsServerHandler(object):
             room_id = room_entry["room_id"]
             joined_users = yield self.store.get_users_in_room(room_id)
             entry = yield self.room_list_handler.generate_room_entry(
-                room_id, len(joined_users),
-                with_alias=False, allow_private=True,
+                room_id, len(joined_users), with_alias=False, allow_private=True,
             )
             entry = dict(entry)  # so we don't change whats cached
             entry.pop("room_id", None)
@@ -544,8 +543,7 @@ class GroupsServerHandler(object):
 
             joined_users = yield self.store.get_users_in_room(room_id)
             entry = yield self.room_list_handler.generate_room_entry(
-                room_id, len(joined_users),
-                with_alias=False, allow_private=True,
+                room_id, len(joined_users), with_alias=False, allow_private=True,
             )
 
             if not entry:
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 1ec2b734e9..c5847def0f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -53,16 +53,17 @@ class RoomListHandler(BaseHandler):
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
-                                   network_tuple=EMPTY_THIRD_PARTY_ID,):
+                                   network_tuple=EMPTY_THIRD_PARTY_ID,
+                                   from_federation=False):
         """Generate a local public room list.
 
         There are multiple different lists: the main one plus one per third
         party network. A client can ask for a specific list or to return all.
 
         Args:
-            limit (int)
-            since_token (str)
-            search_filter (dict)
+            limit (int|None)
+            since_token (str|None)
+            search_filter (dict|None)
             network_tuple (ThirdPartyInstanceID): Which public list to use.
                 This can be (None, None) to indicate the main list, or a particular
                 appservice and network id to use an appservice specific one.
@@ -90,14 +91,30 @@ class RoomListHandler(BaseHandler):
         return self.response_cache.wrap(
             key,
             self._get_public_room_list,
-            limit, since_token, network_tuple=network_tuple,
+            limit, since_token,
+            network_tuple=network_tuple, from_federation=from_federation,
         )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
                               search_filter=None,
                               network_tuple=EMPTY_THIRD_PARTY_ID,
+                              from_federation=False,
                               timeout=None,):
+        """Generate a public room list.
+        Args:
+            limit (int|None): Maximum amount of rooms to return.
+            since_token (str|None)
+            search_filter (dict|None): Dictionary to filter rooms by.
+            network_tuple (ThirdPartyInstanceID): Which public list to use.
+                This can be (None, None) to indicate the main list, or a particular
+                appservice and network id to use an appservice specific one.
+                Setting to None returns all public rooms across all lists.
+            from_federation (bool): Whether this request originated from a
+                federating server or a client. Used for room filtering.
+            timeout (int|None): Amount of seconds to wait for a response before
+                timing out.
+        """
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
@@ -220,7 +237,8 @@ class RoomListHandler(BaseHandler):
             yield concurrently_execute(
                 lambda r: self._append_room_entry_to_chunk(
                     r, rooms_to_num_joined[r],
-                    chunk, limit, search_filter
+                    chunk, limit, search_filter,
+                    from_federation=from_federation,
                 ),
                 batch, 5,
             )
@@ -291,23 +309,51 @@ class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
-                                    search_filter):
+                                    search_filter, from_federation=False):
         """Generate the entry for a room in the public room list and append it
         to the `chunk` if it matches the search filter
+
+        Args:
+            room_id (str): The ID of the room.
+            num_joined_users (int): The number of joined users in the room.
+            chunk (list)
+            limit (int|None): Maximum amount of rooms to display. Function will
+                return if length of chunk is greater than limit + 1.
+            search_filter (dict|None)
+            from_federation (bool): Whether this request originated from a
+                federating server or a client. Used for room filtering.
         """
         if limit and len(chunk) > limit + 1:
             # We've already got enough, so lets just drop it.
             return
 
         result = yield self.generate_room_entry(room_id, num_joined_users)
+        if not result:
+            return
+
+        if from_federation and not result.get("m.federate", True):
+            # This is a room that other servers cannot join. Do not show them
+            # this room.
+            return
 
-        if result and _matches_room_entry(result, search_filter):
+        if _matches_room_entry(result, search_filter):
             chunk.append(result)
 
     @cachedInlineCallbacks(num_args=1, cache_context=True)
     def generate_room_entry(self, room_id, num_joined_users, cache_context,
                             with_alias=True, allow_private=False):
         """Returns the entry for a room
+
+        Args:
+            room_id (str): The room's ID.
+            num_joined_users (int): Number of users in the room.
+            cache_context: Information for cached responses.
+            with_alias (bool): Whether to return the room's aliases in the result.
+            allow_private (bool): Whether invite-only rooms should be shown.
+
+        Returns:
+            Deferred[dict|None]: Returns a room entry as a dictionary, or None if this
+            room was determined not to be shown publicly.
         """
         result = {
             "room_id": room_id,
@@ -321,6 +367,7 @@ class RoomListHandler(BaseHandler):
         event_map = yield self.store.get_events([
             event_id for key, event_id in iteritems(current_state_ids)
             if key[0] in (
+                EventTypes.Create,
                 EventTypes.JoinRules,
                 EventTypes.Name,
                 EventTypes.Topic,
@@ -337,12 +384,17 @@ class RoomListHandler(BaseHandler):
         }
 
         # Double check that this is actually a public room.
+
         join_rules_event = current_state.get((EventTypes.JoinRules, ""))
         if join_rules_event:
             join_rule = join_rules_event.content.get("join_rule", None)
             if not allow_private and join_rule and join_rule != JoinRules.PUBLIC:
                 defer.returnValue(None)
 
+        # Return whether this room is open to federation users or not
+        create_event = current_state.get((EventTypes.Create, ""))
+        result["m.federate"] = create_event.content.get("m.federate", True)
+
         if with_alias:
             aliases = yield self.store.get_aliases_for_room(
                 room_id, on_invalidate=cache_context.invalidate
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 75c9e8355f..530bd3756c 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -270,7 +270,17 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         if "\n" in string:
             raise Exception("Unexpected newline in command: %r", string)
 
-        self.sendLine(string.encode("utf-8"))
+        encoded_string = string.encode("utf-8")
+
+        if len(encoded_string) > self.MAX_LENGTH:
+            raise Exception(
+                "Failed to send command %s as too long (%d > %d)" % (
+                    cmd.NAME,
+                    len(encoded_string), self.MAX_LENGTH,
+                )
+            )
+
+        self.sendLine(encoded_string)
 
         self.last_sent_command = self.clock.time_msec()
 
@@ -365,6 +375,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
     def id(self):
         return "%s-%s" % (self.name, self.conn_id)
 
+    def lineLengthExceeded(self, line):
+        """Called when we receive a line that is above the maximum line length
+        """
+        self.send_error("Line length exceeded")
+
 
 class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 5a80eef211..a0333d5309 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -30,6 +30,7 @@ from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id
+from synapse.util import batch_iter
 from synapse.util.caches.descriptors import Cache
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.stringutils import exception_to_unicode
@@ -1327,10 +1328,16 @@ class SQLBaseStore(object):
         """
         txn.call_after(self._invalidate_state_caches, room_id, members_changed)
 
-        keys = itertools.chain([room_id], members_changed)
-        self._send_invalidation_to_replication(
-            txn, _CURRENT_STATE_CACHE_NAME, keys,
-        )
+        # We need to be careful that the size of the `members_changed` list
+        # isn't so large that it causes problems sending over replication, so we
+        # send them in chunks.
+        # Max line length is 16K, and max user ID length is 255, so 50 should
+        # be safe.
+        for chunk in batch_iter(members_changed, 50):
+            keys = itertools.chain([room_id], chunk)
+            self._send_invalidation_to_replication(
+                txn, _CURRENT_STATE_CACHE_NAME, keys,
+            )
 
     def _invalidate_state_caches(self, room_id, members_changed):
         """Invalidates caches that are based on the current state, but does
@@ -1596,6 +1603,14 @@ class SQLBaseStore(object):
 
         return cls.cursor_to_dict(txn)
 
+    @property
+    def database_engine_name(self):
+        return self.database_engine.module.__name__
+
+    def get_server_version(self):
+        """Returns a string describing the server version number"""
+        return self.database_engine.server_version
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 4004427c7b..dc3238501c 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -23,6 +23,7 @@ class PostgresEngine(object):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
         self.synchronous_commit = database_config.get("synchronous_commit", True)
+        self._version = None   # unknown as yet
 
     def check_database(self, txn):
         txn.execute("SHOW SERVER_ENCODING")
@@ -87,3 +88,27 @@ class PostgresEngine(object):
         """
         txn.execute("SELECT nextval('state_group_id_seq')")
         return txn.fetchone()[0]
+
+    @property
+    def server_version(self):
+        """Returns a string giving the server version. For example: '8.1.5'
+
+        Returns:
+            string
+        """
+        # note that this is a bit of a hack because it relies on on_new_connection
+        # having been called at least once. Still, that should be a safe bet here.
+        numver = self._version
+        assert numver is not None
+
+        # https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION
+        if numver >= 100000:
+            return "%i.%i" % (
+                numver / 10000, numver % 10000,
+            )
+        else:
+            return "%i.%i.%i" % (
+                numver / 10000,
+                (numver % 10000) / 100,
+                numver % 100,
+            )
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 059ab81055..1bcd5b99a4 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -70,6 +70,15 @@ class Sqlite3Engine(object):
             self._current_state_group_id += 1
             return self._current_state_group_id
 
+    @property
+    def server_version(self):
+        """Gets a string giving the server version. For example: '3.22.0'
+
+        Returns:
+            string
+        """
+        return "%i.%i.%i" % self.module.sqlite_version_info
+
 
 # Following functions taken from: https://github.com/coleifer/peewee