From 45c7f12d2a7243b4d273d8af1639c03f3136d2a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Mar 2017 16:24:54 +0000 Subject: Add new storage function to slave store --- synapse/replication/slave/storage/events.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 622b2d8540..518c9ea2e9 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore): get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) + get_current_state_ids = ( + StateStore.__dict__["get_current_state_ids"] + ) + has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ -- cgit 1.4.1 From 29ed09e80a8c7ddeebe3f257a336e4c387a06c88 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Mar 2017 12:16:55 +0000 Subject: Fix assertion to stop transaction queue getting wedged ... and update some docstrings to correctly reflect the types being used. get_new_device_msgs_for_remote can return a long under some circumstances, which was being stored in last_device_list_stream_id_by_dest, and was then upsetting things on the next loop. --- synapse/federation/transaction_queue.py | 5 +++++ synapse/replication/slave/storage/_slaved_id_tracker.py | 5 +++++ synapse/storage/deviceinbox.py | 6 +++--- synapse/storage/devices.py | 2 +- synapse/storage/util/id_generators.py | 14 ++++++++++++++ synapse/util/caches/stream_change_cache.py | 2 +- 6 files changed, 29 insertions(+), 5 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 90235ff098..c802dd67a3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -99,7 +99,12 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + # destination -> stream_id of last successfully sent to-device message. + # NB: may be a long or an int. self.last_device_stream_id_by_dest = {} + + # destination -> stream_id of last successfully sent device list + # update. self.last_device_list_stream_id_by_dest = {} # HACK to get unique tx id diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index 24b5c79d4a..9d1d173b2f 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -27,4 +27,9 @@ class SlavedIdTracker(object): self._current = (max if self.step > 0 else min)(self._current, new_id) def get_current_token(self): + """ + + Returns: + int + """ return self._current diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 5c7db5e5f6..7925cb5f1b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore): """ Args: destination(str): The name of the remote server. - last_stream_id(int): The last position of the device message stream + last_stream_id(int|long): The last position of the device message stream that the server sent up to. - current_stream_id(int): The current position of the device + current_stream_id(int|long): The current position of the device message stream. Returns: - Deferred ([dict], int): List of messages for the device and where + Deferred ([dict], int|long): List of messages for the device and where in the stream the messages got to. """ diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 563071b7a9..e545b62e39 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -308,7 +308,7 @@ class DeviceStore(SQLBaseStore): """Get stream of updates to send to remote servers Returns: - (now_stream_id, [ { updates }, .. ]) + (int, list[dict]): current stream id and list of updates """ now_stream_id = self._device_list_id_gen.get_current_token() diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 46cf93ff87..95031dc9ec 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -30,6 +30,17 @@ class IdGenerator(object): def _load_current_id(db_conn, table, column, step=1): + """ + + Args: + db_conn (object): + table (str): + column (str): + step (int): + + Returns: + int + """ cur = db_conn.cursor() if step == 1: cur.execute("SELECT MAX(%s) FROM %s" % (column, table,)) @@ -131,6 +142,9 @@ class StreamIdGenerator(object): def get_current_token(self): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. + + Returns: + int """ with self._lock: if self._unfinished_ids: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b72bb0ff02..70fe00ce0b 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -50,7 +50,7 @@ class StreamChangeCache(object): def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) is int + assert type(stream_pos) is int or type(stream_pos) is long if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() -- cgit 1.4.1 From 61f471f7793e7cd24d4dde5afd34cea1e46df724 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Mar 2017 15:47:51 +0000 Subject: Don't send the full event json over replication --- synapse/app/synchrotron.py | 11 +++++--- synapse/replication/resource.py | 4 +-- synapse/replication/slave/storage/events.py | 42 ++++++++++------------------- synapse/storage/events.py | 31 ++++++++++----------- 4 files changed, 38 insertions(+), 50 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 449fac771b..c75b7c04cd 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -410,11 +410,16 @@ class SynchrotronServer(HomeServer): stream = result.get("events") if stream: max_position = stream["position"] + + event_map = yield store.get_events([row[1] for row in stream["rows"]]) + for row in stream["rows"]: position = row[0] - internal = json.loads(row[1]) - event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) + event_id = row[1] + event = event_map.get(event_id, None) + if not event: + continue + extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index d8eb14592b..03930fe958 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -283,12 +283,12 @@ class ReplicationResource(Resource): if request_events != upto_events_token: writer.write_header_and_rows("events", res.new_forward_events, ( - "position", "internal", "json", "state_group" + "position", "event_id", "room_id", "type", "state_key", ), position=upto_events_token) if request_backfill != upto_backfill_token: writer.write_header_and_rows("backfill", res.new_backfill_events, ( - "position", "internal", "json", "state_group", + "position", "event_id", "room_id", "type", "state_key", "redacts", ), position=upto_backfill_token) writer.write_header_and_rows( diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 518c9ea2e9..a513165a14 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -242,46 +242,32 @@ class SlavedEventStore(BaseSlavedStore): return super(SlavedEventStore, self).process_replication(result) def _process_replication_row(self, row, backfilled): - internal = json.loads(row[1]) - event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) + stream_ordering = row[0] if not backfilled else -row[0] self.invalidate_caches_for_event( - event, backfilled, + stream_ordering, row[1], row[2], row[3], row[4], row[5], + backfilled=backfilled, ) - def invalidate_caches_for_event(self, event, backfilled): - self._invalidate_get_event_cache(event.event_id) + def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, + etype, state_key, redacts, backfilled): + self._invalidate_get_event_cache(event_id) - self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_latest_event_ids_in_room.invalidate((room_id,)) self.get_unread_event_push_actions_by_room_for_user.invalidate_many( - (event.room_id,) + (room_id,) ) if not backfilled: self._events_stream_cache.entity_has_changed( - event.room_id, event.internal_metadata.stream_ordering + room_id, stream_ordering ) - # self.get_unread_event_push_actions_by_room_for_user.invalidate_many( - # (event.room_id,) - # ) + if redacts: + self._invalidate_get_event_cache(redacts) - if event.type == EventTypes.Redaction: - self._invalidate_get_event_cache(event.redacts) - - if event.type == EventTypes.Member: + if etype == EventTypes.Member: self._membership_stream_cache.entity_has_changed( - event.state_key, event.internal_metadata.stream_ordering + state_key, stream_ordering ) - self.get_invited_rooms_for_user.invalidate((event.state_key,)) - - if not event.is_state(): - return - - if backfilled: - return - - if (not event.internal_metadata.is_invite_from_remote() - and event.internal_metadata.is_outlier()): - return + self.get_invited_rooms_for_user.invalidate((state_key,)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 72319c35ae..e8a7717640 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1620,14 +1620,13 @@ class EventsStore(SQLBaseStore): def get_all_new_events_txn(txn): sql = ( - "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group" - " FROM events as e" - " JOIN event_json as ej" - " ON e.event_id = ej.event_id AND e.room_id = ej.room_id" - " LEFT JOIN event_to_state_groups as eg" - " ON e.event_id = eg.event_id" - " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?" - " ORDER BY e.stream_ordering ASC" + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" " LIMIT ?" ) if have_forward_events: @@ -1653,15 +1652,13 @@ class EventsStore(SQLBaseStore): forward_ex_outliers = [] sql = ( - "SELECT -e.stream_ordering, ej.internal_metadata, ej.json," - " eg.state_group" - " FROM events as e" - " JOIN event_json as ej" - " ON e.event_id = ej.event_id AND e.room_id = ej.room_id" - " LEFT JOIN event_to_state_groups as eg" - " ON e.event_id = eg.event_id" - " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?" - " ORDER BY e.stream_ordering DESC" + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " ORDER BY stream_ordering DESC" " LIMIT ?" ) if have_backfill_events: -- cgit 1.4.1 From aac6d1fc9b376ce0f1cd4df06f10f41e250db77f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 20 Mar 2017 13:47:56 +0000 Subject: PEP8 --- synapse/app/synchrotron.py | 1 - synapse/replication/slave/storage/events.py | 2 -- 2 files changed, 3 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c75b7c04cd..9f93b311d1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite from synapse.http.server import JsonResource diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index a513165a14..a1e1e54e5b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,7 +16,6 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.api.constants import EventTypes -from synapse.events import FrozenEvent from synapse.storage import DataStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore @@ -25,7 +24,6 @@ from synapse.storage.state import StateStore from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache -import ujson as json import logging -- cgit 1.4.1 From d58b1ffe9424453526026e294ac9b6458d31eb9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Mar 2017 11:07:02 +0000 Subject: Replace some calls to cursor_to_dict cursor_to_dict can be surprisinglh expensive for large result sets, so lets only call it when we need to. --- synapse/replication/slave/storage/events.py | 1 - synapse/storage/roommember.py | 43 ++++++----------------------- synapse/storage/state.py | 8 +++--- 3 files changed, 13 insertions(+), 39 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index a1e1e54e5b..d4db1e452e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -167,7 +167,6 @@ class SlavedEventStore(BaseSlavedStore): _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) - _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ _get_state_for_groups = DataStore._get_state_for_groups.__func__ _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ _get_events_around_txn = DataStore._get_events_around_txn.__func__ diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index e38d8927bf..23127d3a95 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -132,14 +132,17 @@ class RoomMemberStore(SQLBaseStore): @cached(max_entries=500000, iterable=True) def get_users_in_room(self, room_id): def f(txn): - - rows = self._get_members_rows_txn( - txn, - room_id=room_id, - membership=Membership.JOIN, + sql = ( + "SELECT m.user_id FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" + " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" ) - return [r["user_id"] for r in rows] + txn.execute(sql, (room_id, Membership.JOIN,)) + return [r[0] for r in txn] return self.runInteraction("get_users_in_room", f) @cached() @@ -246,34 +249,6 @@ class RoomMemberStore(SQLBaseStore): return results - def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): - where_clause = "c.room_id = ?" - where_values = [room_id] - - if membership: - where_clause += " AND m.membership = ?" - where_values.append(membership) - - if user_id: - where_clause += " AND m.user_id = ?" - where_values.append(user_id) - - sql = ( - "SELECT m.* FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id " - " AND m.room_id = c.room_id " - " AND m.user_id = c.state_key" - " WHERE c.type = 'm.room.member' AND %(where)s" - ) % { - "where": where_clause, - } - - txn.execute(sql, where_values) - rows = self.cursor_to_dict(txn) - - return rows - @cachedInlineCallbacks(max_entries=500000, iterable=True) def get_rooms_for_user(self, user_id): """Returns a set of room_ids the user is currently joined to diff --git a/synapse/storage/state.py b/synapse/storage/state.py index aedd597ded..314216f039 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -342,10 +342,10 @@ class StateStore(SQLBaseStore): args.extend(where_args) txn.execute(sql % (where_clause,), args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] + for row in txn: + typ, state_key, event_id = row + key = (typ, state_key) + results[group][key] = event_id else: if types is not None: where_clause = "AND (%s)" % ( -- cgit 1.4.1 From 09f79aaad02ee640b22e7762f28a0b6885b9593b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Mar 2017 13:21:08 +0000 Subject: Use presence replication stream to invalidate cache Instead of using the cache invalidation replication stream to invalidate the _get_presence_cache, we can instead rely on the presence replication stream. This reduces the amount of replication traffic considerably. --- synapse/replication/slave/storage/presence.py | 1 + synapse/storage/presence.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 40f6c9a386..e4a2414d78 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -57,5 +57,6 @@ class SlavedPresenceStore(BaseSlavedStore): self.presence_stream_cache.entity_has_changed( user_id, position ) + self._get_presence_for_user.invalidate((user_id,)) return super(SlavedPresenceStore, self).process_replication(result) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 4d1590d2b4..4edfe29585 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -85,8 +85,8 @@ class PresenceStore(SQLBaseStore): self.presence_stream_cache.entity_has_changed, state.user_id, stream_id, ) - self._invalidate_cache_and_stream( - txn, self._get_presence_for_user, (state.user_id,) + txn.call_after( + self._get_presence_for_user, (state.user_id,) ) # Actually insert new rows -- cgit 1.4.1