diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/engines/__init__.py | 5 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 63 | ||||
-rw-r--r-- | synapse/storage/events.py | 52 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 20 | ||||
-rw-r--r-- | synapse/storage/room.py | 6 | ||||
-rw-r--r-- | synapse/storage/search.py | 2 |
8 files changed, 122 insertions, 32 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eacd49d6a5..8cdfd50f90 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore, def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days + * Users who have created their accounts more than 30 days ago * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days + * Where account creation and last_seen are > 30 days apart Returns counts globaly for a given user as well as breaking by platform diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2fbebd4907..2262776ab2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -376,7 +376,7 @@ class SQLBaseStore(object): Returns: A list of dicts where the key is the column header. """ - col_headers = list(intern(column[0]) for column in cursor.description) + col_headers = list(intern(str(column[0])) for column in cursor.description) results = list( dict(zip(col_headers, row)) for row in cursor ) diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 338b495611..8c868ece75 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -18,6 +18,7 @@ from .postgres import PostgresEngine from .sqlite3 import Sqlite3Engine import importlib +import platform SUPPORTED_MODULE = { @@ -31,6 +32,10 @@ def create_engine(database_config): engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: + # pypy requires psycopg2cffi rather than psycopg2 + if (name == "psycopg2" and + platform.python_implementation() == "PyPy"): + name = "psycopg2cffi" module = importlib.import_module(name) return engine_class(module, database_config) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 00ee82d300..8fbf7ffba7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import random from twisted.internet import defer @@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 import logging -from Queue import PriorityQueue, Empty +from six.moves.queue import PriorityQueue, Empty + +from six.moves import range logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, front_list = list(front) chunks = [ front_list[x:x + 100] - for x in xrange(0, len(front), 100) + for x in range(0, len(front), 100) ] for chunk in chunks: txn.execute( @@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, retcol="event_id", ) + @defer.inlineCallbacks + def get_prev_events_for_room(self, room_id): + """ + Gets a subset of the current forward extremities in the given room. + + Limits the result to 10 extremities, so that we can avoid creating + events which refer to hundreds of prev_events. + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) + if len(res) > 10: + # Sort by reverse depth, so we point to the most recent. + res.sort(key=lambda a: -a[2]) + + # we use half of the limit for the actual most recent events, and + # the other half to randomly point to some of the older events, to + # make sure that we don't completely ignore the older events. + res = res[0:5] + random.sample(res[5:], 5) + + defer.returnValue(res) + def get_latest_event_ids_and_hashes_in_room(self, room_id): + """ + Gets the current forward extremities in the given room + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + return self.runInteraction( "get_latest_event_ids_and_hashes_in_room", self._get_latest_event_ids_and_hashes_in_room, @@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, room_id, ) - @defer.inlineCallbacks - def get_max_depth_of_events(self, event_ids): - sql = ( - "SELECT MAX(depth) FROM events WHERE event_id IN (%s)" - ) % (",".join(["?"] * len(event_ids)),) - - rows = yield self._execute( - "get_max_depth_of_events", None, - sql, *event_ids - ) - - if rows: - defer.returnValue(rows[0][0]) - else: - defer.returnValue(1) - def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ece5e6c41f..5fe4a0e56c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -16,6 +16,7 @@ from collections import OrderedDict, deque, namedtuple from functools import wraps +import itertools import logging import simplejson as json @@ -444,6 +445,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" @@ -1317,13 +1321,49 @@ class EventsStore(EventsWorkerStore): defer.returnValue(set(r["event_id"] for r in rows)) - def have_events(self, event_ids): + @defer.inlineCallbacks + def have_seen_events(self, event_ids): """Given a list of event ids, check if we have already processed them. + Args: + event_ids (iterable[str]): + + Returns: + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. """ if not event_ids: return defer.succeed({}) @@ -1345,9 +1385,7 @@ class EventsStore(EventsWorkerStore): return res - return self.runInteraction( - "have_events", f, - ) + return self.runInteraction("get_rejection_reasons", f) @defer.inlineCallbacks def count_daily_messages(self): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..a937b9bceb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event. + + Raises an exception for unknown events. + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timestamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 740c036975..ea6a189185 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore): # Convert the IDs to MXC URIs for media_id in local_mxcs: - local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) + local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) for hostname, media_id in remote_mxcs: remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) @@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore): while next_token: sql = """ SELECT stream_ordering, json FROM events - JOIN event_json USING (event_id) + JOIN event_json USING (room_id, event_id) WHERE room_id = ? AND stream_ordering < ? AND contains_url = ? AND outlier = ? @@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore): if matches: hostname = matches.group(1) media_id = matches.group(2) - if hostname == self.hostname: + if hostname == self.hs.hostname: local_media_mxcs.append(media_id) else: remote_media_mxcs.append((hostname, media_id)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 426cbe6e1a..6ba3e59889 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT stream_ordering, event_id, room_id, type, json, " " origin_server_ts FROM events" - " JOIN event_json USING (event_id)" + " JOIN event_json USING (room_id, event_id)" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" |