diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/appservice.py | 12 | ||||
-rw-r--r-- | synapse/storage/events.py | 44 | ||||
-rw-r--r-- | synapse/storage/state.py | 52 |
3 files changed, 64 insertions, 44 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a854a87eab..3d5994a580 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): ) def get_app_services(self): - return defer.succeed(self.services_cache) + return self.services_cache def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -54,8 +54,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.sender == user_id: - return defer.succeed(service) - return defer.succeed(None) + return service + return None def get_app_service_by_token(self, token): """Get the application service with the given appservice token. @@ -67,8 +67,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.token == token: - return defer.succeed(service) - return defer.succeed(None) + return service + return None def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -163,7 +163,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore - as_list = yield self.get_app_services() + as_list = self.get_app_services() services = [] for res in results: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6dc46fa50f..6cf9d1176d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1355,39 +1355,53 @@ class EventsStore(SQLBaseStore): min_stream_id = rows[-1][0] event_ids = [row[1] for row in rows] - events = self._get_events_txn(txn, event_ids) + rows_to_update = [] - rows = [] - for event in events: - try: - event_id = event.event_id - origin_server_ts = event.origin_server_ts - except (KeyError, AttributeError): - # If the event is missing a necessary field then - # skip over it. - continue + chunks = [ + event_ids[i:i + 100] + for i in xrange(0, len(event_ids), 100) + ] + for chunk in chunks: + ev_rows = self._simple_select_many_txn( + txn, + table="event_json", + column="event_id", + iterable=chunk, + retcols=["event_id", "json"], + keyvalues={}, + ) - rows.append((origin_server_ts, event_id)) + for row in ev_rows: + event_id = row["event_id"] + event_json = json.loads(row["json"]) + try: + origin_server_ts = event_json["origin_server_ts"] + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + rows_to_update.append((origin_server_ts, event_id)) sql = ( "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" ) - for index in range(0, len(rows), INSERT_CLUMP_SIZE): - clump = rows[index:index + INSERT_CLUMP_SIZE] + for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): + clump = rows_to_update[index:index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows) + "rows_inserted": rows_inserted + len(rows_to_update) } self._background_update_progress_txn( txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress ) - return len(rows) + return len(rows_to_update) result = yield self.runInteraction( self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7eb342674c..49abf0ac74 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -307,6 +307,9 @@ class StateStore(SQLBaseStore): def _get_state_groups_from_groups_txn(self, txn, groups, types=None): results = {group: {} for group in groups} + if types is not None: + types = list(set(types)) # deduplicate types list + if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is # a temporary hack until we can add the right indices in @@ -375,10 +378,35 @@ class StateStore(SQLBaseStore): # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: - group_tree = [group] next_group = group while next_group: + # We did this before by getting the list of group ids, and + # then passing that list to sqlite to get latest event for + # each (type, state_key). However, that was terribly slow + # without the right indicies (which we can't add until + # after we finish deduping state, which requires this func) + args = [next_group] + if types: + args.extend(i for typ in types for i in typ) + + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ? %s" % (where_clause,), + args + ) + rows = txn.fetchall() + results[group].update({ + (typ, state_key): event_id + for typ, state_key, event_id in rows + if (typ, state_key) not in results[group] + }) + + # If the lengths match then we must have all the types, + # so no need to go walk further down the tree. + if types is not None and len(results[group]) == len(types): + break + next_group = self._simple_select_one_onecol_txn( txn, table="state_group_edges", @@ -386,28 +414,6 @@ class StateStore(SQLBaseStore): retcol="prev_state_group", allow_none=True, ) - if next_group: - group_tree.append(next_group) - - sql = (""" - SELECT type, state_key, event_id FROM state_groups_state - INNER JOIN ( - SELECT type, state_key, max(state_group) as state_group - FROM state_groups_state - WHERE state_group IN (%s) %s - GROUP BY type, state_key - ) USING (type, state_key, state_group); - """) % (",".join("?" for _ in group_tree), where_clause,) - - args = list(group_tree) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] return results |