summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py6
-rw-r--r--synapse/storage/devices.py77
-rw-r--r--synapse/storage/events.py24
-rw-r--r--synapse/storage/registration.py28
-rw-r--r--synapse/storage/schema/delta/33/devices.sql21
-rw-r--r--synapse/storage/schema/delta/33/refreshtoken_device.sql16
6 files changed, 152 insertions, 20 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e93c3de66c..73fb334dd6 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 
 from twisted.internet import defer
+
+from synapse.storage.devices import DeviceStore
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
@@ -80,6 +82,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 EventPushActionsStore,
                 OpenIdStore,
                 ClientIpStore,
+                DeviceStore,
                 ):
 
     def __init__(self, db_conn, hs):
@@ -92,7 +95,8 @@ class DataStore(RoomMemberStore, RoomStore,
             extra_tables=[("local_invites", "stream_id")]
         )
         self._backfill_id_gen = StreamIdGenerator(
-            db_conn, "events", "stream_ordering", step=-1
+            db_conn, "events", "stream_ordering", step=-1,
+            extra_tables=[("ex_outlier_stream", "event_stream_ordering")]
         )
         self._receipts_id_gen = StreamIdGenerator(
             db_conn, "receipts_linearized", "stream_id"
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
new file mode 100644
index 0000000000..9065e96d28
--- /dev/null
+++ b/synapse/storage/devices.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+from ._base import SQLBaseStore
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def store_device(self, user_id, device_id,
+                     initial_device_display_name,
+                     ignore_if_known=True):
+        """Ensure the given device is known; add it to the store if not
+
+        Args:
+            user_id (str): id of user associated with the device
+            device_id (str): id of device
+            initial_device_display_name (str): initial displayname of the
+               device
+            ignore_if_known (bool): ignore integrity errors which mean the
+               device is already known
+        Returns:
+            defer.Deferred
+        Raises:
+            StoreError: if ignore_if_known is False and the device was already
+               known
+        """
+        try:
+            yield self._simple_insert(
+                "devices",
+                values={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                    "display_name": initial_device_display_name
+                },
+                desc="store_device",
+                or_ignore=ignore_if_known,
+            )
+        except Exception as e:
+            logger.error("store_device with device_id=%s failed: %s",
+                         device_id, e)
+            raise StoreError(500, "Problem storing device.")
+
+    def get_device(self, user_id, device_id):
+        """Retrieve a device.
+
+        Args:
+            user_id (str): The ID of the user which owns the device
+            device_id (str): The ID of the device to retrieve
+        Returns:
+            defer.Deferred for a namedtuple containing the device information
+        Raises:
+            StoreError: if the device is not found
+        """
+        return self._simple_select_one(
+            table="devices",
+            keyvalues={"user_id": user_id, "device_id": device_id},
+            retcols=("user_id", "device_id", "display_name"),
+            desc="get_device",
+        )
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 91462495ab..6610549281 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1425,7 +1425,7 @@ class EventsStore(SQLBaseStore):
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
-            "SELECT e.event_id FROM events as e"
+            "SELECT DISTINCT e.event_id FROM events as e"
             " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
             " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
             " WHERE e.room_id = ? AND e.topological_ordering < ?"
@@ -1434,6 +1434,20 @@ class EventsStore(SQLBaseStore):
         )
         new_backwards_extrems = txn.fetchall()
 
+        txn.execute(
+            "DELETE FROM event_backward_extremities WHERE room_id = ?",
+            (room_id,)
+        )
+
+        # Update backward extremeties
+        txn.executemany(
+            "INSERT INTO event_backward_extremities (room_id, event_id)"
+            " VALUES (?, ?)",
+            [
+                (room_id, event_id) for event_id, in new_backwards_extrems
+            ]
+        )
+
         # Get all state groups that are only referenced by events that are
         # to be deleted.
         txn.execute(
@@ -1486,20 +1500,12 @@ class EventsStore(SQLBaseStore):
             "event_search",
             "event_signatures",
             "rejections",
-            "event_backward_extremities",
         ):
             txn.executemany(
                 "DELETE FROM %s WHERE event_id = ?" % (table,),
                 to_delete
             )
 
-        # Update backward extremeties
-        txn.executemany(
-            "INSERT INTO event_backward_extremities (room_id, event_id)"
-            " VALUES (?, ?)",
-            [(room_id, event_id) for event_id, in new_backwards_extrems]
-        )
-
         txn.executemany(
             "DELETE FROM events WHERE event_id = ?",
             to_delete
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index d957a629dc..26ef1cfd8a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -31,12 +31,14 @@ class RegistrationStore(SQLBaseStore):
         self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
-    def add_access_token_to_user(self, user_id, token):
+    def add_access_token_to_user(self, user_id, token, device_id=None):
         """Adds an access token for the given user.
 
         Args:
             user_id (str): The user ID.
             token (str): The new access token to add.
+            device_id (str): ID of the device to associate with the access
+               token
         Raises:
             StoreError if there was a problem adding this.
         """
@@ -47,18 +49,21 @@ class RegistrationStore(SQLBaseStore):
             {
                 "id": next_id,
                 "user_id": user_id,
-                "token": token
+                "token": token,
+                "device_id": device_id,
             },
             desc="add_access_token_to_user",
         )
 
     @defer.inlineCallbacks
-    def add_refresh_token_to_user(self, user_id, token):
+    def add_refresh_token_to_user(self, user_id, token, device_id=None):
         """Adds a refresh token for the given user.
 
         Args:
             user_id (str): The user ID.
             token (str): The new refresh token to add.
+            device_id (str): ID of the device to associate with the access
+               token
         Raises:
             StoreError if there was a problem adding this.
         """
@@ -69,7 +74,8 @@ class RegistrationStore(SQLBaseStore):
             {
                 "id": next_id,
                 "user_id": user_id,
-                "token": token
+                "token": token,
+                "device_id": device_id,
             },
             desc="add_refresh_token_to_user",
         )
@@ -291,18 +297,18 @@ class RegistrationStore(SQLBaseStore):
         )
 
     def exchange_refresh_token(self, refresh_token, token_generator):
-        """Exchange a refresh token for a new access token and refresh token.
+        """Exchange a refresh token for a new one.
 
         Doing so invalidates the old refresh token - refresh tokens are single
         use.
 
         Args:
-            token (str): The refresh token of a user.
+            refresh_token (str): The refresh token of a user.
             token_generator (fn: str -> str): Function which, when given a
                 user ID, returns a unique refresh token for that user. This
                 function must never return the same value twice.
         Returns:
-            tuple of (user_id, refresh_token)
+            tuple of (user_id, new_refresh_token, device_id)
         Raises:
             StoreError if no user was found with that refresh token.
         """
@@ -314,12 +320,13 @@ class RegistrationStore(SQLBaseStore):
         )
 
     def _exchange_refresh_token(self, txn, old_token, token_generator):
-        sql = "SELECT user_id FROM refresh_tokens WHERE token = ?"
+        sql = "SELECT user_id, device_id FROM refresh_tokens WHERE token = ?"
         txn.execute(sql, (old_token,))
         rows = self.cursor_to_dict(txn)
         if not rows:
             raise StoreError(403, "Did not recognize refresh token")
         user_id = rows[0]["user_id"]
+        device_id = rows[0]["device_id"]
 
         # TODO(danielwh): Maybe perform a validation on the macaroon that
         # macaroon.user_id == user_id.
@@ -328,7 +335,7 @@ class RegistrationStore(SQLBaseStore):
         sql = "UPDATE refresh_tokens SET token = ? WHERE token = ?"
         txn.execute(sql, (new_token, old_token,))
 
-        return user_id, new_token
+        return user_id, new_token, device_id
 
     @defer.inlineCallbacks
     def is_server_admin(self, user):
@@ -356,7 +363,8 @@ class RegistrationStore(SQLBaseStore):
 
     def _query_for_auth(self, txn, token):
         sql = (
-            "SELECT users.name, users.is_guest, access_tokens.id as token_id"
+            "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
+            " access_tokens.device_id"
             " FROM users"
             " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
diff --git a/synapse/storage/schema/delta/33/devices.sql b/synapse/storage/schema/delta/33/devices.sql
new file mode 100644
index 0000000000..eca7268d82
--- /dev/null
+++ b/synapse/storage/schema/delta/33/devices.sql
@@ -0,0 +1,21 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE devices (
+    user_id TEXT NOT NULL,
+    device_id TEXT NOT NULL,
+    display_name TEXT,
+    CONSTRAINT device_uniqueness UNIQUE (user_id, device_id)
+);
diff --git a/synapse/storage/schema/delta/33/refreshtoken_device.sql b/synapse/storage/schema/delta/33/refreshtoken_device.sql
new file mode 100644
index 0000000000..290bd6da86
--- /dev/null
+++ b/synapse/storage/schema/delta/33/refreshtoken_device.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE refresh_tokens ADD COLUMN device_id TEXT;