diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6be3f4d770..1c3ac03f20 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -52,7 +52,7 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock()
@defer.inlineCallbacks
- def purge_history(self, room_id, event_id):
+ def purge_history(self, room_id, event_id, delete_local_events=False):
event = yield self.store.get_event(event_id)
if event.room_id != room_id:
@@ -61,7 +61,7 @@ class MessageHandler(BaseHandler):
depth = event.depth
with (yield self.pagination_lock.write(room_id)):
- yield self.store.delete_old_state(room_id, depth)
+ yield self.store.purge_history(room_id, depth, delete_local_events)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 71420e54db..ef8e62901b 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -148,11 +148,13 @@ def parse_string_from_args(args, name, default=None, required=False,
return default
-def parse_json_value_from_request(request):
+def parse_json_value_from_request(request, allow_empty_body=False):
"""Parse a JSON value from the body of a twisted HTTP request.
Args:
request: the twisted HTTP request.
+ allow_empty_body (bool): if True, an empty body will be accepted and
+ turned into None
Returns:
The JSON value.
@@ -165,6 +167,9 @@ def parse_json_value_from_request(request):
except Exception:
raise SynapseError(400, "Error reading JSON content.")
+ if not content_bytes and allow_empty_body:
+ return None
+
try:
content = simplejson.loads(content_bytes)
except Exception as e:
@@ -174,17 +179,24 @@ def parse_json_value_from_request(request):
return content
-def parse_json_object_from_request(request):
+def parse_json_object_from_request(request, allow_empty_body=False):
"""Parse a JSON object from the body of a twisted HTTP request.
Args:
request: the twisted HTTP request.
+ allow_empty_body (bool): if True, an empty body will be accepted and
+ turned into an empty dict.
Raises:
SynapseError if the request body couldn't be decoded as JSON or
if it wasn't a JSON object.
"""
- content = parse_json_value_from_request(request)
+ content = parse_json_value_from_request(
+ request, allow_empty_body=allow_empty_body,
+ )
+
+ if allow_empty_body and content is None:
+ return {}
if type(content) != dict:
message = "Content must be a JSON object."
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 7052333c19..97b631e60d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -36,7 +36,7 @@ REQUIREMENTS = {
"pydenticon": ["pydenticon"],
"ujson": ["ujson"],
"blist": ["blist"],
- "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
+ "pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 20c5c66632..6073cc6fa2 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -129,7 +129,14 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
- yield self.handlers.message_handler.purge_history(room_id, event_id)
+ body = parse_json_object_from_request(request, allow_empty_body=True)
+
+ delete_local_events = bool(body.get("delete_local_events", False))
+
+ yield self.handlers.message_handler.purge_history(
+ room_id, event_id,
+ delete_local_events=delete_local_events,
+ )
defer.returnValue((200, {}))
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index e8e8b3986d..3f8d4b9c22 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -70,6 +70,12 @@ class MediaStorage(object):
_write_file_synchronously, source, fname,
))
+ # Tell the storage providers about the new file. They'll decide
+ # if they should upload it and whether to do so synchronously
+ # or not.
+ for provider in self.storage_providers:
+ yield provider.store_file(path, file_info)
+
defer.returnValue(fname)
@contextlib.contextmanager
diff --git a/synapse/state.py b/synapse/state.py
index cc93bbcb6b..932f602508 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -350,7 +350,7 @@ class StateHandler(object):
))
result = yield self._state_resolution_handler.resolve_state_groups(
- room_id, state_groups_ids, self._state_map_factory,
+ room_id, state_groups_ids, None, self._state_map_factory,
)
defer.returnValue(result)
@@ -413,7 +413,9 @@ class StateResolutionHandler(object):
@defer.inlineCallbacks
@log_function
- def resolve_state_groups(self, room_id, state_groups_ids, state_map_factory):
+ def resolve_state_groups(
+ self, room_id, state_groups_ids, event_map, state_map_factory,
+ ):
"""Resolves conflicts between a set of state groups
Always generates a new state group (unless we hit the cache), so should
@@ -425,6 +427,14 @@ class StateResolutionHandler(object):
map from state group id to the state in that state group
(where 'state' is a map from state key to event id)
+ event_map(dict[str,FrozenEvent]|None):
+ a dict from event_id to event, for any events that we happen to
+ have in flight (eg, those currently being persisted). This will be
+ used as a starting point fof finding the state we need; any missing
+ events will be requested via state_map_factory.
+
+ If None, all events will be fetched via state_map_factory.
+
Returns:
Deferred[_StateCacheEntry]: resolved state
"""
@@ -465,6 +475,7 @@ class StateResolutionHandler(object):
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_factory(
state_groups_ids.values(),
+ event_map=event_map,
state_map_factory=state_map_factory,
)
else:
@@ -597,11 +608,20 @@ def _seperate(state_sets):
@defer.inlineCallbacks
-def resolve_events_with_factory(state_sets, state_map_factory):
+def resolve_events_with_factory(state_sets, event_map, state_map_factory):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
+
+ event_map(dict[str,FrozenEvent]|None):
+ a dict from event_id to event, for any events that we happen to
+ have in flight (eg, those currently being persisted). This will be
+ used as a starting point fof finding the state we need; any missing
+ events will be requested via state_map_factory.
+
+ If None, all events will be fetched via state_map_factory.
+
state_map_factory(func): will be called
with a list of event_ids that are needed, and should return with
a Deferred of dict of event_id to event.
@@ -622,12 +642,16 @@ def resolve_events_with_factory(state_sets, state_map_factory):
for event_ids in conflicted_state.itervalues()
for event_id in event_ids
)
+ if event_map is not None:
+ needed_events -= set(event_map.iterkeys())
logger.info("Asking for %d conflicted events", len(needed_events))
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
- # the state events which are in conflict.
+ # the state events which are in conflict (and those in event_map)
state_map = yield state_map_factory(needed_events)
+ if event_map is not None:
+ state_map.update(event_map)
# get the ids of the auth events which allow us to authenticate the
# conflicted state, picking only from the unconflicting state.
@@ -639,6 +663,8 @@ def resolve_events_with_factory(state_sets, state_map_factory):
new_needed_events = set(auth_events.itervalues())
new_needed_events -= needed_events
+ if event_map is not None:
+ new_needed_events -= set(event_map.iterkeys())
logger.info("Asking for %d auth events", len(new_needed_events))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index af56f1ee57..bbb6aa992c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,7 +27,6 @@ from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
-from synapse.state import resolve_events_with_factory
from synapse.util.caches.descriptors import cached
from synapse.types import get_domain_from_id
@@ -237,6 +236,8 @@ class EventsStore(SQLBaseStore):
self._event_persist_queue = _EventPeristenceQueue()
+ self._state_resolution_handler = hs.get_state_resolution_handler()
+
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
@@ -402,6 +403,7 @@ class EventsStore(SQLBaseStore):
"Calculating state delta for room %s", room_id,
)
current_state = yield self._get_new_state_after_events(
+ room_id,
ev_ctx_rm, new_latest_event_ids,
)
if current_state is not None:
@@ -487,11 +489,14 @@ class EventsStore(SQLBaseStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
- def _get_new_state_after_events(self, events_context, new_latest_event_ids):
+ def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
"""Calculate the current state dict after adding some new events to
a room
Args:
+ room_id (str):
+ room to which the events are being added. Used for logging etc
+
events_context (list[(EventBase, EventContext)]):
events and contexts which are being added to the room
@@ -503,8 +508,12 @@ class EventsStore(SQLBaseStore):
None if there are no changes to the room state, or
a dict of (type, state_key) -> event_id].
"""
- state_sets = []
- state_groups = set()
+
+ if not new_latest_event_ids:
+ defer.returnValue({})
+
+ # map from state_group to ((type, key) -> event_id) state map
+ state_groups = {}
missing_event_ids = []
was_updated = False
for event_id in new_latest_event_ids:
@@ -515,16 +524,19 @@ class EventsStore(SQLBaseStore):
if ctx.current_state_ids is None:
raise Exception("Unknown current state")
+ if ctx.state_group is None:
+ # I don't think this can happen, but let's double-check
+ raise Exception(
+ "Context for new extremity event %s has no state "
+ "group" % (event_id, ),
+ )
+
# If we've already seen the state group don't bother adding
# it to the state sets again
if ctx.state_group not in state_groups:
- state_sets.append(ctx.current_state_ids)
+ state_groups[ctx.state_group] = ctx.current_state_ids
if ctx.delta_ids or hasattr(ev, "state_key"):
was_updated = True
- if ctx.state_group:
- # Add this as a seen state group (if it has a state
- # group)
- state_groups.add(ctx.state_group)
break
else:
# If we couldn't find it, then we'll need to pull
@@ -532,65 +544,37 @@ class EventsStore(SQLBaseStore):
was_updated = True
missing_event_ids.append(event_id)
+ if not was_updated:
+ return
+
if missing_event_ids:
# Now pull out the state for any missing events from DB
event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)
- groups = set(event_to_groups.itervalues()) - state_groups
+ groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
if groups:
group_to_state = yield self._get_state_for_groups(groups)
- state_sets.extend(group_to_state.itervalues())
+ state_groups.update(group_to_state)
- if not new_latest_event_ids:
- defer.returnValue({})
- elif was_updated:
- if len(state_sets) == 1:
- # If there is only one state set, then we know what the current
- # state is.
- defer.returnValue(state_sets[0])
- else:
- # We work out the current state by passing the state sets to the
- # state resolution algorithm. It may ask for some events, including
- # the events we have yet to persist, so we need a slightly more
- # complicated event lookup function than simply looking the events
- # up in the db.
-
- logger.info(
- "Resolving state with %i state sets", len(state_sets),
- )
+ if len(state_groups) == 1:
+ # If there is only one state group, then we know what the current
+ # state is.
+ defer.returnValue(state_groups.values()[0])
- events_map = {ev.event_id: ev for ev, _ in events_context}
-
- @defer.inlineCallbacks
- def get_events(ev_ids):
- # We get the events by first looking at the list of events we
- # are trying to persist, and then fetching the rest from the DB.
- db = []
- to_return = {}
- for ev_id in ev_ids:
- ev = events_map.get(ev_id, None)
- if ev:
- to_return[ev_id] = ev
- else:
- db.append(ev_id)
-
- if db:
- evs = yield self.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
- to_return.update(evs)
- defer.returnValue(to_return)
+ def get_events(ev_ids):
+ return self.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
+ )
+ events_map = {ev.event_id: ev for ev, _ in events_context}
+ logger.debug("calling resolve_state_groups from preserve_events")
+ res = yield self._state_resolution_handler.resolve_state_groups(
+ room_id, state_groups, events_map, get_events
+ )
- current_state = yield resolve_events_with_factory(
- state_sets,
- state_map_factory=get_events,
- )
- defer.returnValue(current_state)
- else:
- return
+ defer.returnValue(res.state)
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -2063,16 +2047,32 @@ class EventsStore(SQLBaseStore):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
- def delete_old_state(self, room_id, topological_ordering):
- return self.runInteraction(
- "delete_old_state",
- self._delete_old_state_txn, room_id, topological_ordering
- )
+ def purge_history(
+ self, room_id, topological_ordering, delete_local_events,
+ ):
+ """Deletes room history before a certain point
+
+ Args:
+ room_id (str):
+
+ topological_ordering (int):
+ minimum topo ordering to preserve
- def _delete_old_state_txn(self, txn, room_id, topological_ordering):
- """Deletes old room state
+ delete_local_events (bool):
+ if True, we will delete local events as well as remote ones
+ (instead of just marking them as outliers and deleting their
+ state groups).
"""
+ return self.runInteraction(
+ "purge_history",
+ self._purge_history_txn, room_id, topological_ordering,
+ delete_local_events,
+ )
+
+ def _purge_history_txn(
+ self, txn, room_id, topological_ordering, delete_local_events,
+ ):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@@ -2113,7 +2113,7 @@ class EventsStore(SQLBaseStore):
400, "topological_ordering is greater than forward extremeties"
)
- logger.debug("[purge] looking for events to delete")
+ logger.info("[purge] looking for events to delete")
txn.execute(
"SELECT event_id, state_key FROM events"
@@ -2125,16 +2125,16 @@ class EventsStore(SQLBaseStore):
to_delete = [
(event_id,) for event_id, state_key in event_rows
- if state_key is None and not self.hs.is_mine_id(event_id)
+ if state_key is None and (
+ delete_local_events or not self.hs.is_mine_id(event_id)
+ )
]
logger.info(
- "[purge] found %i events before cutoff, of which %i are remote"
- " non-state events to delete", len(event_rows), len(to_delete))
-
- for event_id, state_key in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+ "[purge] found %i events before cutoff, of which %i can be deleted",
+ len(event_rows), len(to_delete),
+ )
- logger.debug("[purge] Finding new backward extremities")
+ logger.info("[purge] Finding new backward extremities")
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
@@ -2148,7 +2148,7 @@ class EventsStore(SQLBaseStore):
)
new_backwards_extrems = txn.fetchall()
- logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+ logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
@@ -2164,7 +2164,7 @@ class EventsStore(SQLBaseStore):
]
)
- logger.debug("[purge] finding redundant state groups")
+ logger.info("[purge] finding redundant state groups")
# Get all state groups that are only referenced by events that are
# to be deleted.
@@ -2181,15 +2181,15 @@ class EventsStore(SQLBaseStore):
)
state_rows = txn.fetchall()
- logger.debug("[purge] found %i redundant state groups", len(state_rows))
+ logger.info("[purge] found %i redundant state groups", len(state_rows))
# make a set of the redundant state groups, so that we can look them up
# efficiently
state_groups_to_delete = set([sg for sg, in state_rows])
# Now we get all the state groups that rely on these state groups
- logger.debug("[purge] finding state groups which depend on redundant"
- " state groups")
+ logger.info("[purge] finding state groups which depend on redundant"
+ " state groups")
remaining_state_groups = []
for i in xrange(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
@@ -2214,7 +2214,7 @@ class EventsStore(SQLBaseStore):
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
- logger.debug("[purge] de-delta-ing remaining state group %s", sg)
+ logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(
txn, [sg], types=None
)
@@ -2251,7 +2251,7 @@ class EventsStore(SQLBaseStore):
],
)
- logger.debug("[purge] removing redundant state groups")
+ logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
@@ -2261,18 +2261,15 @@ class EventsStore(SQLBaseStore):
state_rows
)
- # Delete all non-state
- logger.debug("[purge] removing events from event_to_state_groups")
+ logger.info("[purge] removing events from event_to_state_groups")
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)
-
- logger.debug("[purge] updating room_depth")
- txn.execute(
- "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
- (topological_ordering, room_id,)
- )
+ for event_id, _ in event_rows:
+ txn.call_after(self._get_state_group_for_event.invalidate, (
+ event_id,
+ ))
# Delete all remote non-state events
for table in (
@@ -2290,7 +2287,7 @@ class EventsStore(SQLBaseStore):
"event_signatures",
"rejections",
):
- logger.debug("[purge] removing remote non-state events from %s", table)
+ logger.info("[purge] removing events from %s", table)
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -2298,16 +2295,30 @@ class EventsStore(SQLBaseStore):
)
# Mark all state and own events as outliers
- logger.debug("[purge] marking remaining events as outliers")
+ logger.info("[purge] marking remaining events as outliers")
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
[
(True, event_id,) for event_id, state_key in event_rows
- if state_key is not None or self.hs.is_mine_id(event_id)
+ if state_key is not None or (
+ not delete_local_events and self.hs.is_mine_id(event_id)
+ )
]
)
+ # synapse tries to take out an exclusive lock on room_depth whenever it
+ # persists events (because upsert), and once we run this update, we
+ # will block that for the rest of our transaction.
+ #
+ # So, let's stick it at the end so that we don't block event
+ # persistence.
+ logger.info("[purge] updating room_depth")
+ txn.execute(
+ "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+ (topological_ordering, room_id,)
+ )
+
logger.info("[purge] done")
@defer.inlineCallbacks
|