diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index e3f086f1c3..0ade47e624 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -147,3 +148,7 @@ class EventContentFields(object):
# Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326
LABELS = "org.matrix.labels"
+
+ # Timestamp to delete the event after
+ # cf https://github.com/matrix-org/matrix-doc/pull/2228
+ SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index bec13f08d8..6eab1f13f0 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 7a9d711669..a4bef00936 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -118,15 +118,16 @@ class ServerConfig(Config):
self.allow_public_rooms_without_auth = False
self.allow_public_rooms_over_federation = False
else:
- # If set to 'False', requires authentication to access the server's public
- # rooms directory through the client API. Defaults to 'True'.
+ # If set to 'true', removes the need for authentication to access the server's
+ # public rooms directory through the client API, meaning that anyone can
+ # query the room directory. Defaults to 'false'.
self.allow_public_rooms_without_auth = config.get(
- "allow_public_rooms_without_auth", True
+ "allow_public_rooms_without_auth", False
)
- # If set to 'False', forbids any other homeserver to fetch the server's public
- # rooms directory via federation. Defaults to 'True'.
+ # If set to 'true', allows any other homeserver to fetch the server's public
+ # rooms directory via federation. Defaults to 'false'.
self.allow_public_rooms_over_federation = config.get(
- "allow_public_rooms_over_federation", True
+ "allow_public_rooms_over_federation", False
)
default_room_version = config.get("default_room_version", DEFAULT_ROOM_VERSION)
@@ -490,6 +491,8 @@ class ServerConfig(Config):
"cleanup_extremities_with_dummy_events", True
)
+ self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False)
+
def has_tls_listener(self) -> bool:
return any(l["tls"] for l in self.listeners)
@@ -618,15 +621,16 @@ class ServerConfig(Config):
#
#require_auth_for_profile_requests: true
- # If set to 'false', requires authentication to access the server's public rooms
- # directory through the client API. Defaults to 'true'.
+ # If set to 'true', removes the need for authentication to access the server's
+ # public rooms directory through the client API, meaning that anyone can
+ # query the room directory. Defaults to 'false'.
#
- #allow_public_rooms_without_auth: false
+ #allow_public_rooms_without_auth: true
- # If set to 'false', forbids any other homeserver to fetch the server's public
- # rooms directory via federation. Defaults to 'true'.
+ # If set to 'true', allows any other homeserver to fetch the server's public
+ # rooms directory via federation. Defaults to 'false'.
#
- #allow_public_rooms_over_federation: false
+ #allow_public_rooms_over_federation: true
# The default room version for newly created rooms.
#
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d3267734f7..d9d0cd9eef 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -121,6 +121,7 @@ class FederationHandler(BaseHandler):
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
+ self._message_handler = hs.get_message_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
@@ -141,6 +142,8 @@ class FederationHandler(BaseHandler):
self.third_party_event_rules = hs.get_third_party_event_rules()
+ self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
+
@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
""" Process a PDU received via a federation /send/ transaction, or
@@ -2715,6 +2718,11 @@ class FederationHandler(BaseHandler):
event_and_contexts, backfilled=backfilled
)
+ if self._ephemeral_messages_enabled:
+ for (event, context) in event_and_contexts:
+ # If there's an expiry timestamp on the event, schedule its expiry.
+ self._message_handler.maybe_schedule_expiry(event)
+
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
yield self._notify_persisted_event(event, max_stream_id)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3b0156f516..4f53a5f5dc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import Optional
from six import iteritems, itervalues, string_types
@@ -22,9 +23,16 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
+from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RelationTypes,
+ UserTypes,
+)
from synapse.api.errors import (
AuthError,
Codes,
@@ -62,6 +70,17 @@ class MessageHandler(object):
self.storage = hs.get_storage()
self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer()
+ self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+ self._is_worker_app = bool(hs.config.worker_app)
+
+ # The scheduled call to self._expire_event. None if no call is currently
+ # scheduled.
+ self._scheduled_expiry = None # type: Optional[IDelayedCall]
+
+ if not hs.config.worker_app:
+ run_as_background_process(
+ "_schedule_next_expiry", self._schedule_next_expiry
+ )
@defer.inlineCallbacks
def get_room_data(
@@ -225,6 +244,100 @@ class MessageHandler(object):
for user_id, profile in iteritems(users_with_profile)
}
+ def maybe_schedule_expiry(self, event):
+ """Schedule the expiry of an event if there's not already one scheduled,
+ or if the one running is for an event that will expire after the provided
+ timestamp.
+
+ This function needs to invalidate the event cache, which is only possible on
+ the master process, and therefore needs to be run on there.
+
+ Args:
+ event (EventBase): The event to schedule the expiry of.
+ """
+ assert not self._is_worker_app
+
+ expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
+ if not isinstance(expiry_ts, int) or event.is_state():
+ return
+
+ # _schedule_expiry_for_event won't actually schedule anything if there's already
+ # a task scheduled for a timestamp that's sooner than the provided one.
+ self._schedule_expiry_for_event(event.event_id, expiry_ts)
+
+ @defer.inlineCallbacks
+ def _schedule_next_expiry(self):
+ """Retrieve the ID and the expiry timestamp of the next event to be expired,
+ and schedule an expiry task for it.
+
+ If there's no event left to expire, set _expiry_scheduled to None so that a
+ future call to save_expiry_ts can schedule a new expiry task.
+ """
+ # Try to get the expiry timestamp of the next event to expire.
+ res = yield self.store.get_next_event_to_expire()
+ if res:
+ event_id, expiry_ts = res
+ self._schedule_expiry_for_event(event_id, expiry_ts)
+
+ def _schedule_expiry_for_event(self, event_id, expiry_ts):
+ """Schedule an expiry task for the provided event if there's not already one
+ scheduled at a timestamp that's sooner than the provided one.
+
+ Args:
+ event_id (str): The ID of the event to expire.
+ expiry_ts (int): The timestamp at which to expire the event.
+ """
+ if self._scheduled_expiry:
+ # If the provided timestamp refers to a time before the scheduled time of the
+ # next expiry task, cancel that task and reschedule it for this timestamp.
+ next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000
+ if expiry_ts < next_scheduled_expiry_ts:
+ self._scheduled_expiry.cancel()
+ else:
+ return
+
+ # Figure out how many seconds we need to wait before expiring the event.
+ now_ms = self.clock.time_msec()
+ delay = (expiry_ts - now_ms) / 1000
+
+ # callLater doesn't support negative delays, so trim the delay to 0 if we're
+ # in that case.
+ if delay < 0:
+ delay = 0
+
+ logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)
+
+ self._scheduled_expiry = self.clock.call_later(
+ delay,
+ run_as_background_process,
+ "_expire_event",
+ self._expire_event,
+ event_id,
+ )
+
+ @defer.inlineCallbacks
+ def _expire_event(self, event_id):
+ """Retrieve and expire an event that needs to be expired from the database.
+
+ If the event doesn't exist in the database, log it and delete the expiry date
+ from the database (so that we don't try to expire it again).
+ """
+ assert self._ephemeral_events_enabled
+
+ self._scheduled_expiry = None
+
+ logger.info("Expiring event %s", event_id)
+
+ try:
+ # Expire the event if we know about it. This function also deletes the expiry
+ # date from the database in the same database transaction.
+ yield self.store.expire_event(event_id)
+ except Exception as e:
+ logger.error("Could not expire event %s: %r", event_id, e)
+
+ # Schedule the expiry of the next event to expire.
+ yield self._schedule_next_expiry()
+
# The duration (in ms) after which rooms should be removed
# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
@@ -295,6 +408,10 @@ class EventCreationHandler(object):
5 * 60 * 1000,
)
+ self._message_handler = hs.get_message_handler()
+
+ self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+
@defer.inlineCallbacks
def create_event(
self,
@@ -877,6 +994,10 @@ class EventCreationHandler(object):
event, context=context
)
+ if self._ephemeral_events_enabled:
+ # If there's an expiry timestamp on the event, schedule its expiry.
+ self._message_handler.maybe_schedule_expiry(event)
+
yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
def _notify():
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e92b2eafd5..22768e97ff 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -198,21 +199,21 @@ class RoomCreationHandler(BaseHandler):
# finally, shut down the PLs in the old room, and update them in the new
# room.
yield self._update_upgraded_room_pls(
- requester, old_room_id, new_room_id, old_room_state
+ requester, old_room_id, new_room_id, old_room_state,
)
return new_room_id
@defer.inlineCallbacks
def _update_upgraded_room_pls(
- self, requester, old_room_id, new_room_id, old_room_state
+ self, requester, old_room_id, new_room_id, old_room_state,
):
"""Send updated power levels in both rooms after an upgrade
Args:
requester (synapse.types.Requester): the user requesting the upgrade
- old_room_id (unicode): the id of the room to be replaced
- new_room_id (unicode): the id of the replacement room
+ old_room_id (str): the id of the room to be replaced
+ new_room_id (str): the id of the replacement room
old_room_state (dict[tuple[str, str], str]): the state map for the old room
Returns:
@@ -298,7 +299,7 @@ class RoomCreationHandler(BaseHandler):
tombstone_event_id (unicode|str): the ID of the tombstone event in the old
room.
Returns:
- Deferred[None]
+ Deferred
"""
user_id = requester.user.to_string()
@@ -333,6 +334,7 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.Encryption, ""),
(EventTypes.ServerACL, ""),
(EventTypes.RelatedGroups, ""),
+ (EventTypes.PowerLevels, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -346,6 +348,31 @@ class RoomCreationHandler(BaseHandler):
if old_event:
initial_state[k] = old_event.content
+ # Resolve the minimum power level required to send any state event
+ # We will give the upgrading user this power level temporarily (if necessary) such that
+ # they are able to copy all of the state events over, then revert them back to their
+ # original power level afterwards in _update_upgraded_room_pls
+
+ # Copy over user power levels now as this will not be possible with >100PL users once
+ # the room has been created
+
+ power_levels = initial_state[(EventTypes.PowerLevels, "")]
+
+ # Calculate the minimum power level needed to clone the room
+ event_power_levels = power_levels.get("events", {})
+ state_default = power_levels.get("state_default", 0)
+ ban = power_levels.get("ban")
+ needed_power_level = max(state_default, ban, max(event_power_levels.values()))
+
+ # Raise the requester's power level in the new room if necessary
+ current_power_level = power_levels["users"][requester.user.to_string()]
+ if current_power_level < needed_power_level:
+ # Assign this power level to the requester
+ power_levels["users"][requester.user.to_string()] = needed_power_level
+
+ # Set the power levels to the modified state
+ initial_state[(EventTypes.PowerLevels, "")] = power_levels
+
yield self._send_events_for_new_room(
requester,
new_room_id,
@@ -874,6 +901,10 @@ class RoomContextHandler(object):
room_id, event_id, before_limit, after_limit, event_filter
)
+ if event_filter:
+ results["events_before"] = event_filter.filter(results["events_before"])
+ results["events_after"] = event_filter.filter(results["events_after"])
+
results["events_before"] = yield filter_evts(results["events_before"])
results["events_after"] = yield filter_evts(results["events_after"])
results["event"] = event
@@ -902,7 +933,12 @@ class RoomContextHandler(object):
state = yield self.state_store.get_state_for_events(
[last_event_id], state_filter=state_filter
)
- results["state"] = list(state[last_event_id].values())
+
+ state_events = list(state[last_event_id].values())
+ if event_filter:
+ state_events = event_filter.filter(state_events)
+
+ results["state"] = state_events
# We use a dummy token here as we only care about the room portion of
# the token, which we replace.
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 05fc64f409..03934956f4 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -256,6 +256,7 @@ class TerseJSONToTCPLogObserver(object):
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
self._producer.resumeProducing()
+ self._connection_waiter = None
return
# If the producer is still producing, stop it.
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index bb30ce3f34..2a477ad22e 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py
index 8cf415e29d..c234ea7421 100644
--- a/synapse/rest/media/v1/thumbnailer.py
+++ b/synapse/rest/media/v1/thumbnailer.py
@@ -129,5 +129,8 @@ class Thumbnailer(object):
def _encode_image(self, output_image, output_type):
output_bytes_io = BytesIO()
- output_image.save(output_bytes_io, self.FORMATS[output_type], quality=80)
+ fmt = self.FORMATS[output_type]
+ if fmt == "JPEG":
+ output_image = output_image.convert("RGB")
+ output_image.save(output_bytes_io, fmt, quality=80)
return output_bytes_io
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index d8ad59ad93..643327b57b 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -145,13 +145,28 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
txn.execute(signature_sql, signature_query_params)
rows = self.cursor_to_dict(txn)
+ # add each cross-signing signature to the correct device in the result dict.
for row in rows:
+ signing_user_id = row["user_id"]
+ signing_key_id = row["key_id"]
target_user_id = row["target_user_id"]
target_device_id = row["target_device_id"]
- if target_user_id in result and target_device_id in result[target_user_id]:
- result[target_user_id][target_device_id].setdefault(
- "signatures", {}
- ).setdefault(row["user_id"], {})[row["key_id"]] = row["signature"]
+ signature = row["signature"]
+
+ target_user_result = result.get(target_user_id)
+ if not target_user_result:
+ continue
+
+ target_device_result = target_user_result.get(target_device_id)
+ if not target_device_result:
+ # note that target_device_result will be None for deleted devices.
+ continue
+
+ target_device_signatures = target_device_result.setdefault("signatures", {})
+ signing_user_signatures = target_device_signatures.setdefault(
+ signing_user_id, {}
+ )
+ signing_user_signatures[signing_key_id] = signature
log_kv(result)
return result
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 2737a1d3ae..79c91fe284 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -130,6 +130,8 @@ class EventsStore(
if self.hs.config.redaction_retention_period is not None:
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
+ self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
+
@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
@@ -940,6 +942,12 @@ class EventsStore(
txn, event.event_id, labels, event.room_id, event.depth
)
+ if self._ephemeral_messages_enabled:
+ # If there's an expiry timestamp on the event, store it.
+ expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
+ if isinstance(expiry_ts, int) and not event.is_state():
+ self._insert_event_expiry_txn(txn, event.event_id, expiry_ts)
+
# Insert into the room_memberships table.
self._store_room_members_txn(
txn,
@@ -1101,12 +1109,7 @@ class EventsStore(
def _update_censor_txn(txn):
for redaction_id, event_id, pruned_json in updates:
if pruned_json:
- self._simple_update_one_txn(
- txn,
- table="event_json",
- keyvalues={"event_id": event_id},
- updatevalues={"json": pruned_json},
- )
+ self._censor_event_txn(txn, event_id, pruned_json)
self._simple_update_one_txn(
txn,
@@ -1117,6 +1120,22 @@ class EventsStore(
yield self.runInteraction("_update_censor_txn", _update_censor_txn)
+ def _censor_event_txn(self, txn, event_id, pruned_json):
+ """Censor an event by replacing its JSON in the event_json table with the
+ provided pruned JSON.
+
+ Args:
+ txn (LoggingTransaction): The database transaction.
+ event_id (str): The ID of the event to censor.
+ pruned_json (str): The pruned JSON
+ """
+ self._simple_update_one_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event_id},
+ updatevalues={"json": pruned_json},
+ )
+
@defer.inlineCallbacks
def count_daily_messages(self):
"""
@@ -1957,6 +1976,101 @@ class EventsStore(
],
)
+ def _insert_event_expiry_txn(self, txn, event_id, expiry_ts):
+ """Save the expiry timestamp associated with a given event ID.
+
+ Args:
+ txn (LoggingTransaction): The database transaction to use.
+ event_id (str): The event ID the expiry timestamp is associated with.
+ expiry_ts (int): The timestamp at which to expire (delete) the event.
+ """
+ return self._simple_insert_txn(
+ txn=txn,
+ table="event_expiry",
+ values={"event_id": event_id, "expiry_ts": expiry_ts},
+ )
+
+ @defer.inlineCallbacks
+ def expire_event(self, event_id):
+ """Retrieve and expire an event that has expired, and delete its associated
+ expiry timestamp. If the event can't be retrieved, delete its associated
+ timestamp so we don't try to expire it again in the future.
+
+ Args:
+ event_id (str): The ID of the event to delete.
+ """
+ # Try to retrieve the event's content from the database or the event cache.
+ event = yield self.get_event(event_id)
+
+ def delete_expired_event_txn(txn):
+ # Delete the expiry timestamp associated with this event from the database.
+ self._delete_event_expiry_txn(txn, event_id)
+
+ if not event:
+ # If we can't find the event, log a warning and delete the expiry date
+ # from the database so that we don't try to expire it again in the
+ # future.
+ logger.warning(
+ "Can't expire event %s because we don't have it.", event_id
+ )
+ return
+
+ # Prune the event's dict then convert it to JSON.
+ pruned_json = encode_json(prune_event_dict(event.get_dict()))
+
+ # Update the event_json table to replace the event's JSON with the pruned
+ # JSON.
+ self._censor_event_txn(txn, event.event_id, pruned_json)
+
+ # We need to invalidate the event cache entry for this event because we
+ # changed its content in the database. We can't call
+ # self._invalidate_cache_and_stream because self.get_event_cache isn't of the
+ # right type.
+ txn.call_after(self._get_event_cache.invalidate, (event.event_id,))
+ # Send that invalidation to replication so that other workers also invalidate
+ # the event cache.
+ self._send_invalidation_to_replication(
+ txn, "_get_event_cache", (event.event_id,)
+ )
+
+ yield self.runInteraction("delete_expired_event", delete_expired_event_txn)
+
+ def _delete_event_expiry_txn(self, txn, event_id):
+ """Delete the expiry timestamp associated with an event ID without deleting the
+ actual event.
+
+ Args:
+ txn (LoggingTransaction): The transaction to use to perform the deletion.
+ event_id (str): The event ID to delete the associated expiry timestamp of.
+ """
+ return self._simple_delete_txn(
+ txn=txn, table="event_expiry", keyvalues={"event_id": event_id}
+ )
+
+ def get_next_event_to_expire(self):
+ """Retrieve the entry with the lowest expiry timestamp in the event_expiry
+ table, or None if there's no more event to expire.
+
+ Returns: Deferred[Optional[Tuple[str, int]]]
+ A tuple containing the event ID as its first element and an expiry timestamp
+ as its second one, if there's at least one row in the event_expiry table.
+ None otherwise.
+ """
+
+ def get_next_event_to_expire_txn(txn):
+ txn.execute(
+ """
+ SELECT event_id, expiry_ts FROM event_expiry
+ ORDER BY expiry_ts ASC LIMIT 1
+ """
+ )
+
+ return txn.fetchone()
+
+ return self.runInteraction(
+ desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
+ )
+
AllNewEventsResult = namedtuple(
"AllNewEventsResult",
diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql
new file mode 100644
index 0000000000..81a36a8b1d
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql
@@ -0,0 +1,21 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_expiry (
+ event_id TEXT PRIMARY KEY,
+ expiry_ts BIGINT NOT NULL
+);
+
+CREATE INDEX event_expiry_expiry_ts_idx ON event_expiry(expiry_ts);
diff --git a/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql b/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql
index 27a96123e3..5c5fffcafb 100644
--- a/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql
+++ b/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql
@@ -40,7 +40,8 @@ CREATE TABLE IF NOT EXISTS e2e_cross_signing_signatures (
signature TEXT NOT NULL
);
-CREATE UNIQUE INDEX e2e_cross_signing_signatures_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id);
+-- replaced by the index created in signing_keys_nonunique_signatures.sql
+-- CREATE UNIQUE INDEX e2e_cross_signing_signatures_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id);
-- stream of user signature updates
CREATE TABLE IF NOT EXISTS user_signature_stream (
diff --git a/synapse/storage/data_stores/main/schema/delta/56/signing_keys_nonunique_signatures.sql b/synapse/storage/data_stores/main/schema/delta/56/signing_keys_nonunique_signatures.sql
new file mode 100644
index 0000000000..0aa90ebf0c
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/signing_keys_nonunique_signatures.sql
@@ -0,0 +1,22 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* The cross-signing signatures index should not be a unique index, because a
+ * user may upload multiple signatures for the same target user. The previous
+ * index was unique, so delete it if it's there and create a new non-unique
+ * index. */
+
+DROP INDEX IF EXISTS e2e_cross_signing_signatures_idx; CREATE INDEX IF NOT
+EXISTS e2e_cross_signing_signatures2_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id);
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 9ae4a913a1..21a410afd0 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
|