diff options
-rwxr-xr-x | scripts/sync_room_to_group.pl | 5 | ||||
-rw-r--r-- | synapse/server.py | 31 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 85 |
3 files changed, 68 insertions, 53 deletions
diff --git a/scripts/sync_room_to_group.pl b/scripts/sync_room_to_group.pl index 92891f3d19..f0c2dfadfa 100755 --- a/scripts/sync_room_to_group.pl +++ b/scripts/sync_room_to_group.pl @@ -26,6 +26,9 @@ my $group_users = [ (map { $_->{user_id} } @{decode_json($ua->get("${hs}/_matrix/client/unstable/groups/${group_id}/invited_users?access_token=${access_token}" )->decoded_content)->{chunk}}), ]; +die "refusing to sync from empty room" unless (@$room_users); +die "refusing to sync to empty group" unless (@$group_users); + my $diff = {}; foreach my $user (@$room_users) { $diff->{$user}++ } foreach my $user (@$group_users) { $diff->{$user}-- } @@ -39,4 +42,4 @@ foreach my $user (keys %$diff) { warn "removing $user"; print STDERR $ua->put("${hs}/_matrix/client/unstable/groups/${group_id}/admin/users/remove/${user}?access_token=${access_token}", Content=>'{}')->status_line."\n"; } -} \ No newline at end of file +} diff --git a/synapse/server.py b/synapse/server.py index 10e3e9a4f1..4746cc7b6c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -90,17 +90,12 @@ class HomeServer(object): """ DEPENDENCIES = [ - 'config', - 'clock', 'http_client', 'db_pool', - 'persistence_service', 'replication_layer', - 'datastore', 'handlers', 'v1auth', 'auth', - 'rest_servlet_factory', 'state_handler', 'presence_handler', 'sync_handler', @@ -118,18 +113,7 @@ class HomeServer(object): 'device_message_handler', 'profile_handler', 'notifier', - 'distributor', - 'client_resource', - 'resource_for_federation', - 'resource_for_static_content', - 'resource_for_web_client', - 'resource_for_content_repo', - 'resource_for_server_key', - 'resource_for_server_key_v2', - 'resource_for_media_repository', - 'resource_for_metrics', 'event_sources', - 'ratelimiter', 'keyring', 'pusherpool', 'event_builder_factory', @@ -183,6 +167,21 @@ class HomeServer(object): def is_mine_id(self, string): return string.split(":", 1)[1] == self.hostname + def get_clock(self): + return self.clock + + def get_datastore(self): + return self.datastore + + def get_config(self): + return self.config + + def get_distributor(self): + return self.distributor + + def get_ratelimiter(self): + return self.ratelimiter + def build_replication_layer(self): return initialize_http_replication(self) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index c8a1eb016b..56a0bde549 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore): """ content_json = json.dumps(content) - def add_account_data_txn(txn, next_id): - self._simple_upsert_txn( - txn, + with self._account_data_id_gen.get_next() as next_id: + # no need to lock here as room_account_data has a unique constraint + # on (user_id, room_id, account_data_type) so _simple_upsert will + # retry if there is a conflict. + yield self._simple_upsert( + desc="add_room_account_data", table="room_account_data", keyvalues={ "user_id": user_id, @@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore): values={ "stream_id": next_id, "content": content_json, - } - ) - txn.call_after( - self._account_data_stream_cache.entity_has_changed, - user_id, next_id, + }, + lock=False, ) - txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) - self._update_max_stream_id(txn, next_id) - with self._account_data_id_gen.get_next() as next_id: - yield self.runInteraction( - "add_room_account_data", add_account_data_txn, next_id - ) + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + yield self._update_max_stream_id(next_id) + + self._account_data_stream_cache.entity_has_changed(user_id, next_id) + self.get_account_data_for_user.invalidate((user_id,)) result = self._account_data_id_gen.get_current_token() defer.returnValue(result) @@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore): """ content_json = json.dumps(content) - def add_account_data_txn(txn, next_id): - self._simple_upsert_txn( - txn, + with self._account_data_id_gen.get_next() as next_id: + # no need to lock here as account_data has a unique constraint on + # (user_id, account_data_type) so _simple_upsert will retry if + # there is a conflict. + yield self._simple_upsert( + desc="add_user_account_data", table="account_data", keyvalues={ "user_id": user_id, @@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore): values={ "stream_id": next_id, "content": content_json, - } + }, + lock=False, ) - txn.call_after( - self._account_data_stream_cache.entity_has_changed, + + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + yield self._update_max_stream_id(next_id) + + self._account_data_stream_cache.entity_has_changed( user_id, next_id, ) - txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) - txn.call_after( - self.get_global_account_data_by_type_for_user.invalidate, + self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_by_type_for_user.invalidate( (account_data_type, user_id,) ) - self._update_max_stream_id(txn, next_id) - - with self._account_data_id_gen.get_next() as next_id: - yield self.runInteraction( - "add_user_account_data", add_account_data_txn, next_id - ) result = self._account_data_id_gen.get_current_token() defer.returnValue(result) - def _update_max_stream_id(self, txn, next_id): + def _update_max_stream_id(self, next_id): """Update the max stream_id Args: - txn: The database cursor next_id(int): The the revision to advance to. """ - update_max_id_sql = ( - "UPDATE account_data_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" + def _update(txn): + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + return self.runInteraction( + "update_account_data_max_stream_id", + _update, ) - txn.execute(update_max_id_sql, (next_id, next_id)) @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): |