summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_federation.py63
-rw-r--r--synapse/storage/event_push_actions.py24
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/room.py6
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/stream.py2
6 files changed, 109 insertions, 37 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py

index 00ee82d300..8fbf7ffba7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import random from twisted.internet import defer @@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 import logging -from Queue import PriorityQueue, Empty +from six.moves.queue import PriorityQueue, Empty + +from six.moves import range logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, front_list = list(front) chunks = [ front_list[x:x + 100] - for x in xrange(0, len(front), 100) + for x in range(0, len(front), 100) ] for chunk in chunks: txn.execute( @@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, retcol="event_id", ) + @defer.inlineCallbacks + def get_prev_events_for_room(self, room_id): + """ + Gets a subset of the current forward extremities in the given room. + + Limits the result to 10 extremities, so that we can avoid creating + events which refer to hundreds of prev_events. + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) + if len(res) > 10: + # Sort by reverse depth, so we point to the most recent. + res.sort(key=lambda a: -a[2]) + + # we use half of the limit for the actual most recent events, and + # the other half to randomly point to some of the older events, to + # make sure that we don't completely ignore the older events. + res = res[0:5] + random.sample(res[5:], 5) + + defer.returnValue(res) + def get_latest_event_ids_and_hashes_in_room(self, room_id): + """ + Gets the current forward extremities in the given room + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + return self.runInteraction( "get_latest_event_ids_and_hashes_in_room", self._get_latest_event_ids_and_hashes_in_room, @@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, room_id, ) - @defer.inlineCallbacks - def get_max_depth_of_events(self, event_ids): - sql = ( - "SELECT MAX(depth) FROM events WHERE event_id IN (%s)" - ) % (",".join(["?"] * len(event_ids)),) - - rows = yield self._execute( - "get_max_depth_of_events", None, - sql, *event_ids - ) - - if rows: - defer.returnValue(rows[0][0]) - else: - defer.returnValue(1) - def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index e78f8d0114..c22762eb5c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py
@@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) + @defer.inlineCallbacks def remove_push_actions_from_staging(self, event_id): """Called if we failed to persist the event to ensure that stale push actions don't build up in the DB @@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): event_id (str) """ - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) + try: + res = yield self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + defer.returnValue(res) + except Exception: + # this method is called from an exception handler, so propagating + # another exception here really isn't helpful - there's nothing + # the caller can do about it. Just log the exception and move on. + logger.exception( + "Error removing push actions after event persistence failure", + ) @defer.inlineCallbacks def _find_stream_orderings_for_times(self): diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index da44b52fd6..5fe4a0e56c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -16,6 +16,7 @@ from collections import OrderedDict, deque, namedtuple from functools import wraps +import itertools import logging import simplejson as json @@ -1320,13 +1321,49 @@ class EventsStore(EventsWorkerStore): defer.returnValue(set(r["event_id"] for r in rows)) - def have_events(self, event_ids): + @defer.inlineCallbacks + def have_seen_events(self, event_ids): """Given a list of event ids, check if we have already processed them. + Args: + event_ids (iterable[str]): + Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. """ if not event_ids: return defer.succeed({}) @@ -1348,9 +1385,7 @@ class EventsStore(EventsWorkerStore): return res - return self.runInteraction( - "have_events", f, - ) + return self.runInteraction("get_rejection_reasons", f) @defer.inlineCallbacks def count_daily_messages(self): diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c036975..ea6a189185 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore): # Convert the IDs to MXC URIs for media_id in local_mxcs: - local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) + local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) for hostname, media_id in remote_mxcs: remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) @@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore): while next_token: sql = """ SELECT stream_ordering, json FROM events - JOIN event_json USING (event_id) + JOIN event_json USING (room_id, event_id) WHERE room_id = ? AND stream_ordering < ? AND contains_url = ? AND outlier = ? @@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore): if matches: hostname = matches.group(1) media_id = matches.group(2) - if hostname == self.hostname: + if hostname == self.hs.hostname: local_media_mxcs.append(media_id) else: remote_media_mxcs.append((hostname, media_id)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e1a..6ba3e59889 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT stream_ordering, event_id, room_id, type, json, " " origin_server_ts FROM events" - " JOIN event_json USING (event_id)" + " JOIN event_json USING (room_id, event_id)" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2956c3b3e0..3b8b539993 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py
@@ -202,7 +202,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ])) + ], consumeErrors=True)) results.update(dict(zip(rm_ids, res))) defer.returnValue(results)