summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/events.py13
-rw-r--r--synapse/storage/events_worker.py8
-rw-r--r--synapse/storage/registration.py111
-rw-r--r--synapse/storage/relations.py2
-rw-r--r--synapse/storage/schema/delta/55/access_token_expiry.sql18
-rw-r--r--synapse/storage/stream.py16
-rw-r--r--synapse/storage/transactions.py28
8 files changed, 75 insertions, 123 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 29589853c6..2f940dbae6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -30,12 +30,12 @@ from prometheus_client import Histogram
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.logging.context import LoggingContext, PreserveLoggingContext
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id
 from synapse.util import batch_iter
 from synapse.util.caches.descriptors import Cache
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.stringutils import exception_to_unicode
 
 # import a function which will return a monotonic time, in seconds
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index fefba39ea1..b486ca50eb 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,6 +33,8 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
+from synapse.logging.utils import log_function
 from synapse.metrics import BucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.state import StateResolutionStore
@@ -45,8 +47,6 @@ from synapse.util import batch_iter
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
-from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -253,7 +253,14 @@ class EventsStore(
         )
 
         # Read the extrems every 60 minutes
-        hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+        def read_forward_extremities():
+            # run as a background process to make sure that the database transactions
+            # have a logcontext to report to
+            return run_as_background_process(
+                "read_forward_extremities", self._read_forward_extremities
+            )
+
+        hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
 
     @defer.inlineCallbacks
     def _read_forward_extremities(self):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 6d680d405a..874d0a56bc 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -29,14 +29,14 @@ from synapse.api.room_versions import EventFormatVersions
 from synapse.events import FrozenEvent, event_type_from_format_version  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import get_domain_from_id
-from synapse.util.logcontext import (
+from synapse.logging.context import (
     LoggingContext,
     PreserveLoggingContext,
     make_deferred_yieldable,
     run_in_background,
 )
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import get_domain_from_id
 from synapse.util.metrics import Measure
 
 from ._base import SQLBaseStore
@@ -327,7 +327,7 @@ class EventsWorkerStore(SQLBaseStore):
 
         Args:
             events (list(str)): list of event_ids to fetch
-            allow_rejected (bool): Whether to teturn events that were rejected
+            allow_rejected (bool): Whether to return events that were rejected
             update_metrics (bool): Whether to update the cache hit ratio metrics
 
         Returns:
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 983ce13291..8b2c2a97ab 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -25,6 +25,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, StoreError, ThreepidValidationError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.types import UserID
@@ -89,7 +90,8 @@ class RegistrationWorkerStore(SQLBaseStore):
             token (str): The access token of a user.
         Returns:
             defer.Deferred: None, if the token did not match, otherwise dict
-                including the keys `name`, `is_guest`, `device_id`, `token_id`.
+                including the keys `name`, `is_guest`, `device_id`, `token_id`,
+                `valid_until_ms`.
         """
         return self.runInteraction(
             "get_user_by_access_token", self._query_for_auth, token
@@ -283,7 +285,7 @@ class RegistrationWorkerStore(SQLBaseStore):
     def _query_for_auth(self, txn, token):
         sql = (
             "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
-            " access_tokens.device_id"
+            " access_tokens.device_id, access_tokens.valid_until_ms"
             " FROM users"
             " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
@@ -432,19 +434,6 @@ class RegistrationWorkerStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_3pid_guest_access_token(self, medium, address):
-        ret = yield self._simple_select_one(
-            "threepid_guest_access_tokens",
-            {"medium": medium, "address": address},
-            ["guest_access_token"],
-            True,
-            "get_3pid_guest_access_token",
-        )
-        if ret:
-            defer.returnValue(ret["guest_access_token"])
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
     def get_user_id_by_threepid(self, medium, address, require_verified=False):
         """Returns user id from threepid
 
@@ -615,23 +604,29 @@ class RegistrationStore(
         )
 
         self.register_background_update_handler(
-            "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag
+            "users_set_deactivated_flag", self._background_update_set_deactivated_flag
         )
 
         # Create a background job for culling expired 3PID validity tokens
-        hs.get_clock().looping_call(
-            self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
-        )
+        def start_cull():
+            # run as a background process to make sure that the database transactions
+            # have a logcontext to report to
+            return run_as_background_process(
+                "cull_expired_threepid_validation_tokens",
+                self.cull_expired_threepid_validation_tokens,
+            )
+
+        hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
 
     @defer.inlineCallbacks
-    def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+    def _background_update_set_deactivated_flag(self, progress, batch_size):
         """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
         for each of them.
         """
 
         last_user = progress.get("user_id", "")
 
-        def _backgroud_update_set_deactivated_flag_txn(txn):
+        def _background_update_set_deactivated_flag_txn(txn):
             txn.execute(
                 """
                 SELECT
@@ -676,7 +671,7 @@ class RegistrationStore(
                 return False
 
         end = yield self.runInteraction(
-            "users_set_deactivated_flag", _backgroud_update_set_deactivated_flag_txn
+            "users_set_deactivated_flag", _background_update_set_deactivated_flag_txn
         )
 
         if end:
@@ -685,14 +680,16 @@ class RegistrationStore(
         defer.returnValue(batch_size)
 
     @defer.inlineCallbacks
-    def add_access_token_to_user(self, user_id, token, device_id=None):
+    def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms):
         """Adds an access token for the given user.
 
         Args:
             user_id (str): The user ID.
             token (str): The new access token to add.
             device_id (str): ID of the device to associate with the access
-               token
+                token
+            valid_until_ms (int|None): when the token is valid until. None for
+                no expiry.
         Raises:
             StoreError if there was a problem adding this.
         """
@@ -700,14 +697,19 @@ class RegistrationStore(
 
         yield self._simple_insert(
             "access_tokens",
-            {"id": next_id, "user_id": user_id, "token": token, "device_id": device_id},
+            {
+                "id": next_id,
+                "user_id": user_id,
+                "token": token,
+                "device_id": device_id,
+                "valid_until_ms": valid_until_ms,
+            },
             desc="add_access_token_to_user",
         )
 
-    def register(
+    def register_user(
         self,
         user_id,
-        token=None,
         password_hash=None,
         was_guest=False,
         make_guest=False,
@@ -720,9 +722,6 @@ class RegistrationStore(
 
         Args:
             user_id (str): The desired user ID to register.
-            token (str): The desired access token to use for this user. If this
-                is not None, the given access token is associated with the user
-                id.
             password_hash (str): Optional. The password hash for this user.
             was_guest (bool): Optional. Whether this is a guest account being
                 upgraded to a non-guest account.
@@ -739,10 +738,9 @@ class RegistrationStore(
             StoreError if the user_id could not be registered.
         """
         return self.runInteraction(
-            "register",
-            self._register,
+            "register_user",
+            self._register_user,
             user_id,
-            token,
             password_hash,
             was_guest,
             make_guest,
@@ -752,11 +750,10 @@ class RegistrationStore(
             user_type,
         )
 
-    def _register(
+    def _register_user(
         self,
         txn,
         user_id,
-        token,
         password_hash,
         was_guest,
         make_guest,
@@ -769,8 +766,6 @@ class RegistrationStore(
 
         now = int(self.clock.time())
 
-        next_id = self._access_tokens_id_gen.get_next()
-
         try:
             if was_guest:
                 # Ensure that the guest user actually exists
@@ -818,14 +813,6 @@ class RegistrationStore(
         if self._account_validity.enabled:
             self.set_expiration_date_for_user_txn(txn, user_id)
 
-        if token:
-            # it's possible for this to get a conflict, but only for a single user
-            # since tokens are namespaced based on their user ID
-            txn.execute(
-                "INSERT INTO access_tokens(id, user_id, token)" " VALUES (?,?,?)",
-                (next_id, user_id, token),
-            )
-
         if create_profile_with_displayname:
             # set a default displayname serverside to avoid ugly race
             # between auto-joins and clients trying to set displaynames
@@ -972,40 +959,6 @@ class RegistrationStore(
 
         defer.returnValue(res if res else False)
 
-    @defer.inlineCallbacks
-    def save_or_get_3pid_guest_access_token(
-        self, medium, address, access_token, inviter_user_id
-    ):
-        """
-        Gets the 3pid's guest access token if exists, else saves access_token.
-
-        Args:
-            medium (str): Medium of the 3pid. Must be "email".
-            address (str): 3pid address.
-            access_token (str): The access token to persist if none is
-                already persisted.
-            inviter_user_id (str): User ID of the inviter.
-
-        Returns:
-            deferred str: Whichever access token is persisted at the end
-            of this function call.
-        """
-
-        def insert(txn):
-            txn.execute(
-                "INSERT INTO threepid_guest_access_tokens "
-                "(medium, address, guest_access_token, first_inviter) "
-                "VALUES (?, ?, ?, ?)",
-                (medium, address, access_token, inviter_user_id),
-            )
-
-        try:
-            yield self.runInteraction("save_3pid_guest_access_token", insert)
-            defer.returnValue(access_token)
-        except self.database_engine.module.IntegrityError:
-            ret = yield self.get_3pid_guest_access_token(medium, address)
-            defer.returnValue(ret)
-
     def add_user_pending_deactivation(self, user_id):
         """
         Adds a user to the table of users who need to be parted from all the rooms they're
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 1b01934c19..9954bc094f 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -60,7 +60,7 @@ class PaginationChunk(object):
 class RelationPaginationToken(object):
     """Pagination token for relation pagination API.
 
-    As the results are order by topological ordering, we can use the
+    As the results are in topological order, we can use the
     `topological_ordering` and `stream_ordering` fields of the events at the
     boundaries of the chunk as pagination tokens.
 
diff --git a/synapse/storage/schema/delta/55/access_token_expiry.sql b/synapse/storage/schema/delta/55/access_token_expiry.sql
new file mode 100644
index 0000000000..4590604bfd
--- /dev/null
+++ b/synapse/storage/schema/delta/55/access_token_expiry.sql
@@ -0,0 +1,18 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- when this access token can be used until, in ms since the epoch. NULL means the token
+-- never expires.
+ALTER TABLE access_tokens ADD COLUMN valid_until_ms BIGINT;
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d9482a3843..a0465484df 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,12 +41,12 @@ from six.moves import range
 
 from twisted.internet import defer
 
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -833,7 +833,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Returns:
             Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
             as a list of _EventDictReturn and a token that points to the end
-            of the result set.
+            of the result set. If no events are returned then the end of the
+            stream has been reached (i.e. there are no events between
+            `from_token` and `to_token`), or `limit` is zero.
         """
 
         assert int(limit) >= 0
@@ -905,15 +907,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 only those before
             direction(char): Either 'b' or 'f' to indicate whether we are
                 paginating forwards or backwards from `from_key`.
-            limit (int): The maximum number of events to return. Zero or less
-                means no limit.
+            limit (int): The maximum number of events to return.
             event_filter (Filter|None): If provided filters the events to
                 those that match the filter.
 
         Returns:
-            tuple[list[dict], str]: Returns the results as a list of dicts and
-            a token that points to the end of the result set. The dicts have
-            the keys "event_id", "topological_ordering" and "stream_orderign".
+            tuple[list[FrozenEvent], str]: Returns the results as a list of
+            events and a token that points to the end of the result set. If no
+            events are returned then the end of the stream has been reached
+            (i.e. there are no events between `from_key` and `to_key`).
         """
 
         from_key = RoomStreamToken.parse(from_key)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b1188f6bcb..fd18619178 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -133,34 +133,6 @@ class TransactionStore(SQLBaseStore):
             desc="set_received_txn_response",
         )
 
-    def prep_send_transaction(self, transaction_id, destination, origin_server_ts):
-        """Persists an outgoing transaction and calculates the values for the
-        previous transaction id list.
-
-        This should be called before sending the transaction so that it has the
-        correct value for the `prev_ids` key.
-
-        Args:
-            transaction_id (str)
-            destination (str)
-            origin_server_ts (int)
-
-        Returns:
-            list: A list of previous transaction ids.
-        """
-        return defer.succeed([])
-
-    def delivered_txn(self, transaction_id, destination, code, response_dict):
-        """Persists the response for an outgoing transaction.
-
-        Args:
-            transaction_id (str)
-            destination (str)
-            code (int)
-            response_json (str)
-        """
-        pass
-
     @defer.inlineCallbacks
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.