diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 66a995157d..30d0e4c5dc 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -14,6 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
+from . import engines
from twisted.internet import defer
@@ -87,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore):
@defer.inlineCallbacks
def start_doing_background_updates(self):
- while True:
- if self._background_update_timer is not None:
- return
+ assert self._background_update_timer is None, \
+ "background updates already running"
+
+ logger.info("Starting background schema updates")
+ while True:
sleep = defer.Deferred()
self._background_update_timer = self._clock.call_later(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
@@ -101,22 +104,23 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_timer = None
try:
- result = yield self.do_background_update(
+ result = yield self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except:
logger.exception("Error doing update")
-
- if result is None:
- logger.info(
- "No more background updates to do."
- " Unscheduling background update task."
- )
- return
+ else:
+ if result is None:
+ logger.info(
+ "No more background updates to do."
+ " Unscheduling background update task."
+ )
+ defer.returnValue(None)
@defer.inlineCallbacks
- def do_background_update(self, desired_duration_ms):
- """Does some amount of work on a background update
+ def do_next_background_update(self, desired_duration_ms):
+ """Does some amount of work on the next queued background update
+
Args:
desired_duration_ms(float): How long we want to spend
updating.
@@ -135,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_queue.append(update['update_name'])
if not self._background_update_queue:
+ # no work left to do
defer.returnValue(None)
+ # pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
+ res = yield self._do_background_update(update_name, desired_duration_ms)
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def _do_background_update(self, update_name, desired_duration_ms):
+ logger.info("Starting update batch on background update '%s'",
+ update_name)
+
update_handler = self._background_update_handlers[update_name]
performance = self._background_update_performance.get(update_name)
@@ -202,6 +216,64 @@ class BackgroundUpdateStore(SQLBaseStore):
"""
self._background_update_handlers[update_name] = update_handler
+ def register_background_index_update(self, update_name, index_name,
+ table, columns):
+ """Helper for store classes to do a background index addition
+
+ To use:
+
+ 1. use a schema delta file to add a background update. Example:
+ INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('my_new_index', '{}');
+
+ 2. In the Store constructor, call this method
+
+ Args:
+ update_name (str): update_name to register for
+ index_name (str): name of index to add
+ table (str): table to add index to
+ columns (list[str]): columns/expressions to include in index
+ """
+
+ # if this is postgres, we add the indexes concurrently. Otherwise
+ # we fall back to doing it inline
+ if isinstance(self.database_engine, engines.PostgresEngine):
+ conc = True
+ else:
+ conc = False
+
+ sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \
+ % {
+ "conc": "CONCURRENTLY" if conc else "",
+ "name": index_name,
+ "table": table,
+ "columns": ", ".join(columns),
+ }
+
+ def create_index_concurrently(conn):
+ conn.rollback()
+ # postgres insists on autocommit for the index
+ conn.set_session(autocommit=True)
+ c = conn.cursor()
+ c.execute(sql)
+ conn.set_session(autocommit=False)
+
+ def create_index(conn):
+ c = conn.cursor()
+ c.execute(sql)
+
+ @defer.inlineCallbacks
+ def updater(progress, batch_size):
+ logger.info("Adding index %s to %s", index_name, table)
+ if conc:
+ yield self.runWithConnection(create_index_concurrently)
+ else:
+ yield self.runWithConnection(create_index)
+ yield self._end_background_update(update_name)
+ defer.returnValue(1)
+
+ self.register_background_update_handler(update_name, updater)
+
def start_background_update(self, update_name, progress):
"""Starts a background update running.
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 74330a8ddf..71e5ea112f 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -13,10 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore, Cache
+import logging
from twisted.internet import defer
+from ._base import Cache
+from . import background_updates
+
+logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
@@ -24,8 +28,7 @@ from twisted.internet import defer
LAST_SEEN_GRANULARITY = 120 * 1000
-class ClientIpStore(SQLBaseStore):
-
+class ClientIpStore(background_updates.BackgroundUpdateStore):
def __init__(self, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
@@ -34,6 +37,13 @@ class ClientIpStore(SQLBaseStore):
super(ClientIpStore, self).__init__(hs)
+ self.register_background_index_update(
+ "user_ips_device_index",
+ index_name="user_ips_device_id",
+ table="user_ips",
+ columns=["user_id", "device_id", "last_seen"],
+ )
+
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
@@ -67,3 +77,69 @@ class ClientIpStore(SQLBaseStore):
desc="insert_client_ip",
lock=False,
)
+
+ @defer.inlineCallbacks
+ def get_last_client_ip_by_device(self, devices):
+ """For each device_id listed, give the user_ip it was last seen on
+
+ Args:
+ devices (iterable[(str, str)]): list of (user_id, device_id) pairs
+
+ Returns:
+ defer.Deferred: resolves to a dict, where the keys
+ are (user_id, device_id) tuples. The values are also dicts, with
+ keys giving the column names
+ """
+
+ res = yield self.runInteraction(
+ "get_last_client_ip_by_device",
+ self._get_last_client_ip_by_device_txn,
+ retcols=(
+ "user_id",
+ "access_token",
+ "ip",
+ "user_agent",
+ "device_id",
+ "last_seen",
+ ),
+ devices=devices
+ )
+
+ ret = {(d["user_id"], d["device_id"]): d for d in res}
+ defer.returnValue(ret)
+
+ @classmethod
+ def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols):
+ where_clauses = []
+ bindings = []
+ for (user_id, device_id) in devices:
+ if device_id is None:
+ where_clauses.append("(user_id = ? AND device_id IS NULL)")
+ bindings.extend((user_id, ))
+ else:
+ where_clauses.append("(user_id = ? AND device_id = ?)")
+ bindings.extend((user_id, device_id))
+
+ inner_select = (
+ "SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips "
+ "WHERE %(where)s "
+ "GROUP BY user_id, device_id"
+ ) % {
+ "where": " OR ".join(where_clauses),
+ }
+
+ sql = (
+ "SELECT %(retcols)s FROM user_ips "
+ "JOIN (%(inner_select)s) ips ON"
+ " user_ips.last_seen = ips.mls AND"
+ " user_ips.user_id = ips.user_id AND"
+ " (user_ips.device_id = ips.device_id OR"
+ " (user_ips.device_id IS NULL AND ips.device_id IS NULL)"
+ " )"
+ ) % {
+ "retcols": ",".join("user_ips." + c for c in retcols),
+ "inner_select": inner_select,
+ }
+
+ txn.execute(sql, bindings)
+ return cls.cursor_to_dict(txn)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 9065e96d28..afd6530cab 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -65,7 +65,7 @@ class DeviceStore(SQLBaseStore):
user_id (str): The ID of the user which owns the device
device_id (str): The ID of the device to retrieve
Returns:
- defer.Deferred for a namedtuple containing the device information
+ defer.Deferred for a dict containing the device information
Raises:
StoreError: if the device is not found
"""
@@ -75,3 +75,63 @@ class DeviceStore(SQLBaseStore):
retcols=("user_id", "device_id", "display_name"),
desc="get_device",
)
+
+ def delete_device(self, user_id, device_id):
+ """Delete a device.
+
+ Args:
+ user_id (str): The ID of the user which owns the device
+ device_id (str): The ID of the device to delete
+ Returns:
+ defer.Deferred
+ """
+ return self._simple_delete_one(
+ table="devices",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ desc="delete_device",
+ )
+
+ def update_device(self, user_id, device_id, new_display_name=None):
+ """Update a device.
+
+ Args:
+ user_id (str): The ID of the user which owns the device
+ device_id (str): The ID of the device to update
+ new_display_name (str|None): new displayname for device; None
+ to leave unchanged
+ Raises:
+ StoreError: if the device is not found
+ Returns:
+ defer.Deferred
+ """
+ updates = {}
+ if new_display_name is not None:
+ updates["display_name"] = new_display_name
+ if not updates:
+ return defer.succeed(None)
+ return self._simple_update_one(
+ table="devices",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ updatevalues=updates,
+ desc="update_device",
+ )
+
+ @defer.inlineCallbacks
+ def get_devices_by_user(self, user_id):
+ """Retrieve all of a user's registered devices.
+
+ Args:
+ user_id (str):
+ Returns:
+ defer.Deferred: resolves to a dict from device_id to a dict
+ containing "device_id", "user_id" and "display_name" for each
+ device.
+ """
+ devices = yield self._simple_select_list(
+ table="devices",
+ keyvalues={"user_id": user_id},
+ retcols=("user_id", "device_id", "display_name"),
+ desc="get_devices_by_user"
+ )
+
+ defer.returnValue({d["device_id"]: d for d in devices})
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2e89066515..62b7790e91 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import twisted.internet.defer
+
from ._base import SQLBaseStore
@@ -123,3 +125,16 @@ class EndToEndKeyStore(SQLBaseStore):
return self.runInteraction(
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)
+
+ @twisted.internet.defer.inlineCallbacks
+ def delete_e2e_keys_by_device(self, user_id, device_id):
+ yield self._simple_delete(
+ table="e2e_device_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ desc="delete_e2e_device_keys_by_device"
+ )
+ yield self._simple_delete(
+ table="e2e_one_time_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ desc="delete_e2e_one_time_keys_by_device"
+ )
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6610549281..c63ca36df6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore):
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled):
+ """Insert some number of room events into the necessary database tables.
+
+ Rejected events are only inserted into the events table, the events_json table,
+ and the rejections table. Things reading from those table will need to check
+ whether the event was rejected.
+ """
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -407,21 +413,11 @@ class EventsStore(SQLBaseStore):
event.room_id, event.internal_metadata.stream_ordering,
)
- if not event.internal_metadata.is_outlier():
+ if not event.internal_metadata.is_outlier() and not context.rejected:
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth)
)
- if context.push_actions:
- self._set_push_actions_for_event_and_users_txn(
- txn, event, context.push_actions
- )
-
- if event.type == EventTypes.Redaction and event.redacts is not None:
- self._remove_push_actions_for_event_id_txn(
- txn, event.room_id, event.redacts
- )
-
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
@@ -431,14 +427,24 @@ class EventsStore(SQLBaseStore):
),
[event.event_id for event, _ in events_and_contexts]
)
+
have_persisted = {
event_id: outlier
for event_id, outlier in txn.fetchall()
}
+ # Remove the events that we've seen before.
event_map = {}
to_remove = set()
for event, context in events_and_contexts:
+ if context.rejected:
+ # If the event is rejected then we don't care if the event
+ # was an outlier or not.
+ if event.event_id in have_persisted:
+ # If we have already seen the event then ignore it.
+ to_remove.add(event)
+ continue
+
# Handle the case of the list including the same event multiple
# times. The tricky thing here is when they differ by whether
# they are an outlier.
@@ -463,6 +469,12 @@ class EventsStore(SQLBaseStore):
outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
+ # We received a copy of an event that we had already stored as
+ # an outlier in the database. We now have some state at that
+ # so we need to update the state_groups table with that state.
+
+ # insert into the state_group, state_groups_state and
+ # event_to_state_groups tables.
self._store_mult_state_groups_txn(txn, ((event, context),))
metadata_json = encode_json(
@@ -478,6 +490,8 @@ class EventsStore(SQLBaseStore):
(metadata_json, event.event_id,)
)
+ # Add an entry to the ex_outlier_stream table to replicate the
+ # change in outlier status to our workers.
stream_order = event.internal_metadata.stream_ordering
state_group_id = context.state_group or context.new_state_group_id
self._simple_insert_txn(
@@ -499,6 +513,8 @@ class EventsStore(SQLBaseStore):
(False, event.event_id,)
)
+ # Update the event_backward_extremities table now that this
+ # event isn't an outlier any more.
self._update_extremeties(txn, [event])
events_and_contexts = [
@@ -506,38 +522,12 @@ class EventsStore(SQLBaseStore):
]
if not events_and_contexts:
+ # Make sure we don't pass an empty list to functions that expect to
+ # be storing at least one element.
return
- self._store_mult_state_groups_txn(txn, events_and_contexts)
-
- self._handle_mult_prev_events(
- txn,
- events=[event for event, _ in events_and_contexts],
- )
-
- for event, _ in events_and_contexts:
- if event.type == EventTypes.Name:
- self._store_room_name_txn(txn, event)
- elif event.type == EventTypes.Topic:
- self._store_room_topic_txn(txn, event)
- elif event.type == EventTypes.Message:
- self._store_room_message_txn(txn, event)
- elif event.type == EventTypes.Redaction:
- self._store_redaction(txn, event)
- elif event.type == EventTypes.RoomHistoryVisibility:
- self._store_history_visibility_txn(txn, event)
- elif event.type == EventTypes.GuestAccess:
- self._store_guest_access_txn(txn, event)
-
- self._store_room_members_txn(
- txn,
- [
- event
- for event, _ in events_and_contexts
- if event.type == EventTypes.Member
- ],
- backfilled=backfilled,
- )
+ # From this point onwards the events are only events that we haven't
+ # seen before.
def event_dict(event):
return {
@@ -591,10 +581,41 @@ class EventsStore(SQLBaseStore):
],
)
- if context.rejected:
- self._store_rejections_txn(
- txn, event.event_id, context.rejected
- )
+ # Remove the rejected events from the list now that we've added them
+ # to the events table and the events_json table.
+ to_remove = set()
+ for event, context in events_and_contexts:
+ if context.rejected:
+ # Insert the event_id into the rejections table
+ self._store_rejections_txn(
+ txn, event.event_id, context.rejected
+ )
+ to_remove.add(event)
+
+ events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0] not in to_remove
+ ]
+
+ if not events_and_contexts:
+ # Make sure we don't pass an empty list to functions that expect to
+ # be storing at least one element.
+ return
+
+ # From this point onwards the events are only ones that weren't rejected.
+
+ for event, context in events_and_contexts:
+ # Insert all the push actions into the event_push_actions table.
+ if context.push_actions:
+ self._set_push_actions_for_event_and_users_txn(
+ txn, event, context.push_actions
+ )
+
+ if event.type == EventTypes.Redaction and event.redacts is not None:
+ # Remove the entries in the event_push_actions table for the
+ # redacted event.
+ self._remove_push_actions_for_event_id_txn(
+ txn, event.room_id, event.redacts
+ )
self._simple_insert_many_txn(
txn,
@@ -610,6 +631,49 @@ class EventsStore(SQLBaseStore):
],
)
+ # Insert into the state_groups, state_groups_state, and
+ # event_to_state_groups tables.
+ self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+ # Update the event_forward_extremities, event_backward_extremities and
+ # event_edges tables.
+ self._handle_mult_prev_events(
+ txn,
+ events=[event for event, _ in events_and_contexts],
+ )
+
+ for event, _ in events_and_contexts:
+ if event.type == EventTypes.Name:
+ # Insert into the room_names and event_search tables.
+ self._store_room_name_txn(txn, event)
+ elif event.type == EventTypes.Topic:
+ # Insert into the topics table and event_search table.
+ self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Message:
+ # Insert into the event_search table.
+ self._store_room_message_txn(txn, event)
+ elif event.type == EventTypes.Redaction:
+ # Insert into the redactions table.
+ self._store_redaction(txn, event)
+ elif event.type == EventTypes.RoomHistoryVisibility:
+ # Insert into the event_search table.
+ self._store_history_visibility_txn(txn, event)
+ elif event.type == EventTypes.GuestAccess:
+ # Insert into the event_search table.
+ self._store_guest_access_txn(txn, event)
+
+ # Insert into the room_memberships table.
+ self._store_room_members_txn(
+ txn,
+ [
+ event
+ for event, _ in events_and_contexts
+ if event.type == EventTypes.Member
+ ],
+ backfilled=backfilled,
+ )
+
+ # Insert event_reference_hashes table.
self._store_event_reference_hashes_txn(
txn, [event for event, _ in events_and_contexts]
)
@@ -654,6 +718,7 @@ class EventsStore(SQLBaseStore):
],
)
+ # Prefill the event cache
self._add_to_cache(txn, events_and_contexts)
if backfilled:
@@ -666,11 +731,6 @@ class EventsStore(SQLBaseStore):
# Outlier events shouldn't clobber the current state.
continue
- if context.rejected:
- # If the event failed it's auth checks then it shouldn't
- # clobbler the current state.
- continue
-
txn.call_after(
self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9a92b35361..7e7d32eb66 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -18,18 +18,31 @@ import re
from twisted.internet import defer
from synapse.api.errors import StoreError, Codes
-
-from ._base import SQLBaseStore
+from synapse.storage import background_updates
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-class RegistrationStore(SQLBaseStore):
+class RegistrationStore(background_updates.BackgroundUpdateStore):
def __init__(self, hs):
super(RegistrationStore, self).__init__(hs)
self.clock = hs.get_clock()
+ self.register_background_index_update(
+ "access_tokens_device_index",
+ index_name="access_tokens_device_id",
+ table="access_tokens",
+ columns=["user_id", "device_id"],
+ )
+
+ self.register_background_index_update(
+ "refresh_tokens_device_index",
+ index_name="refresh_tokens_device_id",
+ table="refresh_tokens",
+ columns=["user_id", "device_id"],
+ )
+
@defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):
"""Adds an access token for the given user.
@@ -238,16 +251,37 @@ class RegistrationStore(SQLBaseStore):
self.get_user_by_id.invalidate((user_id,))
@defer.inlineCallbacks
- def user_delete_access_tokens(self, user_id, except_token_ids=[]):
- def f(txn):
- sql = "SELECT token FROM access_tokens WHERE user_id = ?"
+ def user_delete_access_tokens(self, user_id, except_token_ids=[],
+ device_id=None,
+ delete_refresh_tokens=False):
+ """
+ Invalidate access/refresh tokens belonging to a user
+
+ Args:
+ user_id (str): ID of user the tokens belong to
+ except_token_ids (list[str]): list of access_tokens which should
+ *not* be deleted
+ device_id (str|None): ID of device the tokens are associated with.
+ If None, tokens associated with any device (or no device) will
+ be deleted
+ delete_refresh_tokens (bool): True to delete refresh tokens as
+ well as access tokens.
+ Returns:
+ defer.Deferred:
+ """
+ def f(txn, table, except_tokens, call_after_delete):
+ sql = "SELECT token FROM %s WHERE user_id = ?" % table
clauses = [user_id]
- if except_token_ids:
+ if device_id is not None:
+ sql += " AND device_id = ?"
+ clauses.append(device_id)
+
+ if except_tokens:
sql += " AND id NOT IN (%s)" % (
- ",".join(["?" for _ in except_token_ids]),
+ ",".join(["?" for _ in except_tokens]),
)
- clauses += except_token_ids
+ clauses += except_tokens
txn.execute(sql, clauses)
@@ -256,16 +290,33 @@ class RegistrationStore(SQLBaseStore):
n = 100
chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)]
for chunk in chunks:
- for row in chunk:
- txn.call_after(self.get_user_by_access_token.invalidate, (row[0],))
+ if call_after_delete:
+ for row in chunk:
+ txn.call_after(call_after_delete, (row[0],))
txn.execute(
- "DELETE FROM access_tokens WHERE token in (%s)" % (
+ "DELETE FROM %s WHERE token in (%s)" % (
+ table,
",".join(["?" for _ in chunk]),
), [r[0] for r in chunk]
)
- yield self.runInteraction("user_delete_access_tokens", f)
+ # delete refresh tokens first, to stop new access tokens being
+ # allocated while our backs are turned
+ if delete_refresh_tokens:
+ yield self.runInteraction(
+ "user_delete_access_tokens", f,
+ table="refresh_tokens",
+ except_tokens=[],
+ call_after_delete=None,
+ )
+
+ yield self.runInteraction(
+ "user_delete_access_tokens", f,
+ table="access_tokens",
+ except_tokens=except_token_ids,
+ call_after_delete=self.get_user_by_access_token.invalidate,
+ )
def delete_access_token(self, access_token):
def f(txn):
@@ -288,9 +339,8 @@ class RegistrationStore(SQLBaseStore):
Args:
token (str): The access token of a user.
Returns:
- dict: Including the name (user_id) and the ID of their access token.
- Raises:
- StoreError if no user was found.
+ defer.Deferred: None, if the token did not match, otherwise dict
+ including the keys `name`, `is_guest`, `device_id`, `token_id`.
"""
return self.runInteraction(
"get_user_by_access_token",
diff --git a/synapse/storage/schema/delta/33/access_tokens_device_index.sql b/synapse/storage/schema/delta/33/access_tokens_device_index.sql
new file mode 100644
index 0000000000..61ad3fe3e8
--- /dev/null
+++ b/synapse/storage/schema/delta/33/access_tokens_device_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('access_tokens_device_index', '{}');
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
new file mode 100644
index 0000000000..140f2b63e0
--- /dev/null
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
@@ -0,0 +1,19 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- make sure that we have a device record for each set of E2E keys, so that the
+-- user can delete them if they like.
+INSERT INTO devices
+ SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json;
diff --git a/synapse/storage/schema/delta/33/refreshtoken_device_index.sql b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql
new file mode 100644
index 0000000000..bb225dafbf
--- /dev/null
+++ b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('refresh_tokens_device_index', '{}');
diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql
new file mode 100644
index 0000000000..473f75a78e
--- /dev/null
+++ b/synapse/storage/schema/delta/33/user_ips_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('user_ips_device_index', '{}');
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 6c7481a728..6258ff1725 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -24,6 +24,7 @@ from collections import namedtuple
import itertools
import logging
+import ujson as json
logger = logging.getLogger(__name__)
@@ -101,7 +102,7 @@ class TransactionStore(SQLBaseStore):
)
if result and result["response_code"]:
- return result["response_code"], result["response_json"]
+ return result["response_code"], json.loads(str(result["response_json"]))
else:
return None
|