summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py63
-rw-r--r--synapse/storage/engines/postgres.py25
-rw-r--r--synapse/storage/engines/sqlite.py9
-rw-r--r--synapse/storage/registration.py66
4 files changed, 120 insertions, 43 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3d895da43c..a0333d5309 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -30,6 +30,7 @@ from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id
+from synapse.util import batch_iter
 from synapse.util.caches.descriptors import Cache
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.stringutils import exception_to_unicode
@@ -1327,10 +1328,16 @@ class SQLBaseStore(object):
         """
         txn.call_after(self._invalidate_state_caches, room_id, members_changed)
 
-        keys = itertools.chain([room_id], members_changed)
-        self._send_invalidation_to_replication(
-            txn, _CURRENT_STATE_CACHE_NAME, keys,
-        )
+        # We need to be careful that the size of the `members_changed` list
+        # isn't so large that it causes problems sending over replication, so we
+        # send them in chunks.
+        # Max line length is 16K, and max user ID length is 255, so 50 should
+        # be safe.
+        for chunk in batch_iter(members_changed, 50):
+            keys = itertools.chain([room_id], chunk)
+            self._send_invalidation_to_replication(
+                txn, _CURRENT_STATE_CACHE_NAME, keys,
+            )
 
     def _invalidate_state_caches(self, room_id, members_changed):
         """Invalidates caches that are based on the current state, but does
@@ -1342,15 +1349,43 @@ class SQLBaseStore(object):
                 changed
         """
         for member in members_changed:
-            self.get_rooms_for_user_with_stream_ordering.invalidate((member,))
+            self._attempt_to_invalidate_cache(
+                "get_rooms_for_user_with_stream_ordering", (member,),
+            )
 
         for host in set(get_domain_from_id(u) for u in members_changed):
-            self.is_host_joined.invalidate((room_id, host))
-            self.was_host_joined.invalidate((room_id, host))
+            self._attempt_to_invalidate_cache(
+                "is_host_joined", (room_id, host,),
+            )
+            self._attempt_to_invalidate_cache(
+                "was_host_joined", (room_id, host,),
+            )
+
+        self._attempt_to_invalidate_cache(
+            "get_users_in_room", (room_id,),
+        )
+        self._attempt_to_invalidate_cache(
+            "get_room_summary", (room_id,),
+        )
+        self._attempt_to_invalidate_cache(
+            "get_current_state_ids", (room_id,),
+        )
+
+    def _attempt_to_invalidate_cache(self, cache_name, key):
+        """Attempts to invalidate the cache of the given name, ignoring if the
+        cache doesn't exist. Mainly used for invalidating caches on workers,
+        where they may not have the cache.
 
-        self.get_users_in_room.invalidate((room_id,))
-        self.get_room_summary.invalidate((room_id,))
-        self.get_current_state_ids.invalidate((room_id,))
+        Args:
+            cache_name (str)
+            key (tuple)
+        """
+        try:
+            getattr(self, cache_name).invalidate(key)
+        except AttributeError:
+            # We probably haven't pulled in the cache in this worker,
+            # which is fine.
+            pass
 
     def _send_invalidation_to_replication(self, txn, cache_name, keys):
         """Notifies replication that given cache has been invalidated.
@@ -1568,6 +1603,14 @@ class SQLBaseStore(object):
 
         return cls.cursor_to_dict(txn)
 
+    @property
+    def database_engine_name(self):
+        return self.database_engine.module.__name__
+
+    def get_server_version(self):
+        """Returns a string describing the server version number"""
+        return self.database_engine.server_version
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 4004427c7b..dc3238501c 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -23,6 +23,7 @@ class PostgresEngine(object):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
         self.synchronous_commit = database_config.get("synchronous_commit", True)
+        self._version = None   # unknown as yet
 
     def check_database(self, txn):
         txn.execute("SHOW SERVER_ENCODING")
@@ -87,3 +88,27 @@ class PostgresEngine(object):
         """
         txn.execute("SELECT nextval('state_group_id_seq')")
         return txn.fetchone()[0]
+
+    @property
+    def server_version(self):
+        """Returns a string giving the server version. For example: '8.1.5'
+
+        Returns:
+            string
+        """
+        # note that this is a bit of a hack because it relies on on_new_connection
+        # having been called at least once. Still, that should be a safe bet here.
+        numver = self._version
+        assert numver is not None
+
+        # https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION
+        if numver >= 100000:
+            return "%i.%i" % (
+                numver / 10000, numver % 10000,
+            )
+        else:
+            return "%i.%i.%i" % (
+                numver / 10000,
+                (numver % 10000) / 100,
+                numver % 100,
+            )
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 059ab81055..1bcd5b99a4 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -70,6 +70,15 @@ class Sqlite3Engine(object):
             self._current_state_group_id += 1
             return self._current_state_group_id
 
+    @property
+    def server_version(self):
+        """Gets a string giving the server version. For example: '3.22.0'
+
+        Returns:
+            string
+        """
+        return "%i.%i.%i" % self.module.sqlite_version_info
+
 
 # Following functions taken from: https://github.com/coleifer/peewee
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9b9572890b..9b6c28892c 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -295,6 +295,39 @@ class RegistrationWorkerStore(SQLBaseStore):
             return ret['user_id']
         return None
 
+    @defer.inlineCallbacks
+    def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
+        yield self._simple_upsert("user_threepids", {
+            "medium": medium,
+            "address": address,
+        }, {
+            "user_id": user_id,
+            "validated_at": validated_at,
+            "added_at": added_at,
+        })
+
+    @defer.inlineCallbacks
+    def user_get_threepids(self, user_id):
+        ret = yield self._simple_select_list(
+            "user_threepids", {
+                "user_id": user_id
+            },
+            ['medium', 'address', 'validated_at', 'added_at'],
+            'user_get_threepids'
+        )
+        defer.returnValue(ret)
+
+    def user_delete_threepid(self, user_id, medium, address):
+        return self._simple_delete(
+            "user_threepids",
+            keyvalues={
+                "user_id": user_id,
+                "medium": medium,
+                "address": address,
+            },
+            desc="user_delete_threepids",
+        )
+
 
 class RegistrationStore(RegistrationWorkerStore,
                         background_updates.BackgroundUpdateStore):
@@ -633,39 +666,6 @@ class RegistrationStore(RegistrationWorkerStore,
         defer.returnValue(res if res else False)
 
     @defer.inlineCallbacks
-    def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
-        yield self._simple_upsert("user_threepids", {
-            "medium": medium,
-            "address": address,
-        }, {
-            "user_id": user_id,
-            "validated_at": validated_at,
-            "added_at": added_at,
-        })
-
-    @defer.inlineCallbacks
-    def user_get_threepids(self, user_id):
-        ret = yield self._simple_select_list(
-            "user_threepids", {
-                "user_id": user_id
-            },
-            ['medium', 'address', 'validated_at', 'added_at'],
-            'user_get_threepids'
-        )
-        defer.returnValue(ret)
-
-    def user_delete_threepid(self, user_id, medium, address):
-        return self._simple_delete(
-            "user_threepids",
-            keyvalues={
-                "user_id": user_id,
-                "medium": medium,
-                "address": address,
-            },
-            desc="user_delete_threepids",
-        )
-
-    @defer.inlineCallbacks
     def save_or_get_3pid_guest_access_token(
             self, medium, address, access_token, inviter_user_id
     ):