diff options
author | Neil Johnson <neil@fragile.org.uk> | 2018-05-14 09:31:42 +0100 |
---|---|---|
committer | Neil Johnson <neil@fragile.org.uk> | 2018-05-14 09:31:42 +0100 |
commit | 977765bde2987770f63065d839f9686a7a144140 (patch) | |
tree | 41d3a247f546cfe50500f465e50a798a597ef464 /synapse/app | |
parent | remove user agent from data model, will just join on user_ips (diff) | |
parent | Merge pull request #2846 from kaiyou/feat-dockerfile (diff) | |
download | synapse-977765bde2987770f63065d839f9686a7a144140.tar.xz |
Merge branch 'develop' of https://github.com/matrix-org/synapse into cohort_analytics
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/appservice.py | 16 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 1 | ||||
-rw-r--r-- | synapse/app/event_creator.py | 1 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 1 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 32 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 1 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 2 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 1 | ||||
-rw-r--r-- | synapse/app/pusher.py | 36 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 101 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 14 |
11 files changed, 119 insertions, 87 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index f2540023a7..b1efacc9f8 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.appservice") @@ -74,6 +74,7 @@ class AppserviceServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) @@ -112,9 +113,14 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - preserve_fn( - self.appservice_handler.notify_interested_services - )(max_stream_id) + run_in_background(self._notify_app_services, max_stream_id) + + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") def start(config_options): diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 267d34c881..38b98382c6 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -98,6 +98,7 @@ class ClientReaderServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index b915d12d53..bd7f3d5679 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -114,6 +114,7 @@ class EventCreatorServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index c1dc66dd17..6e10b27b9e 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -87,6 +87,7 @@ class FederationReaderServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..6f24e32d6d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -101,6 +101,7 @@ class FederationSenderServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) @@ -229,7 +230,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": @@ -237,19 +238,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") if __name__ == '__main__': diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index b349e3e3ce..0f700ee786 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -152,6 +152,7 @@ class FrontendProxyServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 328ab8032c..f785a7a22b 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -140,6 +140,7 @@ class SynapseHomeServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ), self.tls_server_context_factory, ) @@ -153,6 +154,7 @@ class SynapseHomeServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) logger.info("Synapse now listening on port %d", port) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index fc8282bbc1..9c93195f0a 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -94,6 +94,7 @@ class MediaRepositoryServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..3912eae48c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,7 +33,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -104,6 +104,7 @@ class PusherServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) @@ -140,24 +141,27 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..c6294a7a0c 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string @@ -281,6 +281,7 @@ class SynchrotronServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) @@ -327,8 +328,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - - preserve_fn(self.process_and_notify)(stream_name, token, rows) + run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() @@ -340,55 +340,58 @@ class SyncReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def process_and_notify(self, stream_name, token, rows): - if stream_name == "events": - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - event = yield self.store.get_event(row.event_id) - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users + try: + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "push_rules": - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], - ) - elif stream_name in ("account_data", "tag_account_data",): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], - ) - elif stream_name == "receipts": - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "to_device": - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: + elif stream_name in ("account_data", "tag_account_data",): self.notifier.on_new_event( - "to_device_key", token, users=entities, + "account_data_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "device_lists": - all_room_ids = set() - for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) - elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": - self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], - ) + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, + ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) + except Exception: + logger.exception("Error processing replication") def start(config_options): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 5f845e80d1..53eb3474da 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.user_dir") @@ -126,6 +126,7 @@ class UserDirectoryServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) @@ -164,7 +165,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): stream_name, token, rows ) if stream_name == "current_state_deltas": - preserve_fn(self.user_directory.notify_new_event)() + run_in_background(self._notify_directory) + + @defer.inlineCallbacks + def _notify_directory(self): + try: + yield self.user_directory.notify_new_event() + except Exception: + logger.exception("Error notifiying user directory of state update") def start(config_options): |