diff options
41 files changed, 425 insertions, 294 deletions
diff --git a/CHANGES.md b/CHANGES.md index 3cacca5a6b..1e9c3cf953 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +Synapse 0.99.4 (2019-05-15) +=========================== + +No significant changes. + + Synapse 0.99.4rc1 (2019-05-13) ============================== @@ -17,8 +23,8 @@ Features instead of the executable name, `python`. Contributed by Christoph Müller. ([\#5023](https://github.com/matrix-org/synapse/issues/5023)) - Add time-based account expiration. ([\#5027](https://github.com/matrix-org/synapse/issues/5027), [\#5047](https://github.com/matrix-org/synapse/issues/5047), [\#5073](https://github.com/matrix-org/synapse/issues/5073), [\#5116](https://github.com/matrix-org/synapse/issues/5116)) -- Add support for handling /verions, /voip and /push_rules client endpoints to client_reader worker. ([\#5063](https://github.com/matrix-org/synapse/issues/5063), [\#5065](https://github.com/matrix-org/synapse/issues/5065), [\#5070](https://github.com/matrix-org/synapse/issues/5070)) -- Add an configuration option to require authentication on /publicRooms and /profile endpoints. ([\#5083](https://github.com/matrix-org/synapse/issues/5083)) +- Add support for handling `/versions`, `/voip` and `/push_rules` client endpoints to client_reader worker. ([\#5063](https://github.com/matrix-org/synapse/issues/5063), [\#5065](https://github.com/matrix-org/synapse/issues/5065), [\#5070](https://github.com/matrix-org/synapse/issues/5070)) +- Add a configuration option to require authentication on /publicRooms and /profile endpoints. ([\#5083](https://github.com/matrix-org/synapse/issues/5083)) - Move admin APIs to `/_synapse/admin/v1`. (The old paths are retained for backwards-compatibility, for now). ([\#5119](https://github.com/matrix-org/synapse/issues/5119)) - Implement an admin API for sending server notices. Many thanks to @krombel who provided a foundation for this work. ([\#5121](https://github.com/matrix-org/synapse/issues/5121), [\#5142](https://github.com/matrix-org/synapse/issues/5142)) @@ -39,11 +45,9 @@ Bugfixes - Workaround bug in twisted where attempting too many concurrent DNS requests could cause it to hang due to running out of file descriptors. ([\#5037](https://github.com/matrix-org/synapse/issues/5037)) - Make sure we're not registering the same 3pid twice on registration. ([\#5071](https://github.com/matrix-org/synapse/issues/5071)) - Don't crash on lack of expiry templates. ([\#5077](https://github.com/matrix-org/synapse/issues/5077)) -- Fix the ratelimting on third party invites. ([\#5104](https://github.com/matrix-org/synapse/issues/5104)) +- Fix the ratelimiting on third party invites. ([\#5104](https://github.com/matrix-org/synapse/issues/5104)) - Add some missing limitations to room alias creation. ([\#5124](https://github.com/matrix-org/synapse/issues/5124), [\#5128](https://github.com/matrix-org/synapse/issues/5128)) - Limit the number of EDUs in transactions to 100 as expected by synapse. Thanks to @superboum for this work! ([\#5138](https://github.com/matrix-org/synapse/issues/5138)) -- Fix bogus imports in unit tests. ([\#5154](https://github.com/matrix-org/synapse/issues/5154)) - Internal Changes ---------------- @@ -78,6 +82,7 @@ Internal Changes - Prevent an exception from being raised in a IResolutionReceiver and use a more generic error message for blacklisted URL previews. ([\#5155](https://github.com/matrix-org/synapse/issues/5155)) - Run `black` on the tests directory. ([\#5170](https://github.com/matrix-org/synapse/issues/5170)) - Fix CI after new release of isort. ([\#5179](https://github.com/matrix-org/synapse/issues/5179)) +- Fix bogus imports in unit tests. ([\#5154](https://github.com/matrix-org/synapse/issues/5154)) Synapse 0.99.3.2 (2019-05-03) diff --git a/changelog.d/5181.feature b/changelog.d/5181.feature new file mode 100644 index 0000000000..5ce13aa2ea --- /dev/null +++ b/changelog.d/5181.feature @@ -0,0 +1 @@ +Ratelimiting configuration for clients sending messages and the federation server has been altered to match login ratelimiting. The old configuration names will continue working. Check the sample config for details of the new names. diff --git a/changelog.d/5183.misc b/changelog.d/5183.misc new file mode 100644 index 0000000000..a8970f29eb --- /dev/null +++ b/changelog.d/5183.misc @@ -0,0 +1 @@ +Allow client event serialization to be async. diff --git a/changelog.d/5184.misc b/changelog.d/5184.misc new file mode 100644 index 0000000000..1588bdef6c --- /dev/null +++ b/changelog.d/5184.misc @@ -0,0 +1 @@ +Expose DataStore._get_events as get_events_as_list. diff --git a/changelog.d/5185.misc b/changelog.d/5185.misc new file mode 100644 index 0000000000..d148b03b51 --- /dev/null +++ b/changelog.d/5185.misc @@ -0,0 +1 @@ +Update tests to consistently be configured via the same code that is used when loading from configuration files. diff --git a/changelog.d/5190.feature b/changelog.d/5190.feature new file mode 100644 index 0000000000..34904aa7a8 --- /dev/null +++ b/changelog.d/5190.feature @@ -0,0 +1 @@ +Drop support for the undocumented /_matrix/client/v2_alpha API prefix. diff --git a/debian/changelog b/debian/changelog index 454fa8eb16..35cf8ffb20 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,9 +1,12 @@ -matrix-synapse-py3 (0.99.3.2+nmu1) UNRELEASED; urgency=medium +matrix-synapse-py3 (0.99.4) stable; urgency=medium [ Christoph Müller ] * Configure the systemd units to have a log identifier of `matrix-synapse` - -- Christoph Müller <iblzm@hotmail.de> Wed, 17 Apr 2019 16:17:32 +0200 + [ Synapse Packaging team ] + * New synapse release 0.99.4. + + -- Synapse Packaging team <packages@matrix.org> Wed, 15 May 2019 13:58:08 +0100 matrix-synapse-py3 (0.99.3.2) stable; urgency=medium diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index c4e5c4cf39..09ee0e8984 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -446,21 +446,15 @@ log_config: "CONFDIR/SERVERNAME.log.config" ## Ratelimiting ## -# Number of messages a client can send per second -# -#rc_messages_per_second: 0.2 - -# Number of message a client can send before being throttled -# -#rc_message_burst_count: 10.0 - -# Ratelimiting settings for registration and login. +# Ratelimiting settings for client actions (registration, login, messaging). # # Each ratelimiting configuration is made of two parameters: # - per_second: number of requests a client can send per second. # - burst_count: number of requests a client can send before being throttled. # # Synapse currently uses the following configurations: +# - one for messages that ratelimits sending based on the account the client +# is using # - one for registration that ratelimits registration requests based on the # client's IP address. # - one for login that ratelimits login requests based on the client's IP @@ -473,6 +467,10 @@ log_config: "CONFDIR/SERVERNAME.log.config" # # The defaults are as shown below. # +#rc_message: +# per_second: 0.2 +# burst_count: 10 +# #rc_registration: # per_second: 0.17 # burst_count: 3 @@ -488,29 +486,28 @@ log_config: "CONFDIR/SERVERNAME.log.config" # per_second: 0.17 # burst_count: 3 -# The federation window size in milliseconds -# -#federation_rc_window_size: 1000 -# The number of federation requests from a single server in a window -# before the server will delay processing the request. +# Ratelimiting settings for incoming federation # -#federation_rc_sleep_limit: 10 - -# The duration in milliseconds to delay processing events from -# remote servers by if they go over the sleep limit. -# -#federation_rc_sleep_delay: 500 - -# The maximum number of concurrent federation requests allowed -# from a single server +# The rc_federation configuration is made up of the following settings: +# - window_size: window size in milliseconds +# - sleep_limit: number of federation requests from a single server in +# a window before the server will delay processing the request. +# - sleep_delay: duration in milliseconds to delay processing events +# from remote servers by if they go over the sleep limit. +# - reject_limit: maximum number of concurrent federation requests +# allowed from a single server +# - concurrent: number of federation requests to concurrently process +# from a single server # -#federation_rc_reject_limit: 50 - -# The number of federation requests to concurrently process from a -# single server +# The defaults are as shown below. # -#federation_rc_concurrent: 3 +#rc_federation: +# window_size: 1000 +# sleep_limit: 10 +# sleep_delay: 500 +# reject_limit: 50 +# concurrent: 3 # Target outgoing federation transaction frequency for sending read-receipts, # per-room. diff --git a/synapse/__init__.py b/synapse/__init__.py index cd9cfb2409..bf9e810da6 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -27,4 +27,4 @@ try: except ImportError: pass -__version__ = "0.99.4rc1" +__version__ = "0.99.4" diff --git a/synapse/api/urls.py b/synapse/api/urls.py index cb71d80875..3c6bddff7a 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -22,8 +22,7 @@ from six.moves.urllib.parse import urlencode from synapse.config import ConfigError -CLIENT_PREFIX = "/_matrix/client/api/v1" -CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha" +CLIENT_API_PREFIX = "/_matrix/client" FEDERATION_PREFIX = "/_matrix/federation" FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1" FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2" diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 5a68399e63..5a9adac480 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -16,16 +16,56 @@ from ._base import Config class RateLimitConfig(object): - def __init__(self, config): - self.per_second = config.get("per_second", 0.17) - self.burst_count = config.get("burst_count", 3.0) + def __init__(self, config, defaults={"per_second": 0.17, "burst_count": 3.0}): + self.per_second = config.get("per_second", defaults["per_second"]) + self.burst_count = config.get("burst_count", defaults["burst_count"]) -class RatelimitConfig(Config): +class FederationRateLimitConfig(object): + _items_and_default = { + "window_size": 10000, + "sleep_limit": 10, + "sleep_delay": 500, + "reject_limit": 50, + "concurrent": 3, + } + + def __init__(self, **kwargs): + for i in self._items_and_default.keys(): + setattr(self, i, kwargs.get(i) or self._items_and_default[i]) + +class RatelimitConfig(Config): def read_config(self, config): - self.rc_messages_per_second = config.get("rc_messages_per_second", 0.2) - self.rc_message_burst_count = config.get("rc_message_burst_count", 10.0) + + # Load the new-style messages config if it exists. Otherwise fall back + # to the old method. + if "rc_message" in config: + self.rc_message = RateLimitConfig( + config["rc_message"], defaults={"per_second": 0.2, "burst_count": 10.0} + ) + else: + self.rc_message = RateLimitConfig( + { + "per_second": config.get("rc_messages_per_second", 0.2), + "burst_count": config.get("rc_message_burst_count", 10.0), + } + ) + + # Load the new-style federation config, if it exists. Otherwise, fall + # back to the old method. + if "federation_rc" in config: + self.rc_federation = FederationRateLimitConfig(**config["rc_federation"]) + else: + self.rc_federation = FederationRateLimitConfig( + **{ + "window_size": config.get("federation_rc_window_size"), + "sleep_limit": config.get("federation_rc_sleep_limit"), + "sleep_delay": config.get("federation_rc_sleep_delay"), + "reject_limit": config.get("federation_rc_reject_limit"), + "concurrent": config.get("federation_rc_concurrent"), + } + ) self.rc_registration = RateLimitConfig(config.get("rc_registration", {})) @@ -33,38 +73,26 @@ class RatelimitConfig(Config): self.rc_login_address = RateLimitConfig(rc_login_config.get("address", {})) self.rc_login_account = RateLimitConfig(rc_login_config.get("account", {})) self.rc_login_failed_attempts = RateLimitConfig( - rc_login_config.get("failed_attempts", {}), + rc_login_config.get("failed_attempts", {}) ) - self.federation_rc_window_size = config.get("federation_rc_window_size", 1000) - self.federation_rc_sleep_limit = config.get("federation_rc_sleep_limit", 10) - self.federation_rc_sleep_delay = config.get("federation_rc_sleep_delay", 500) - self.federation_rc_reject_limit = config.get("federation_rc_reject_limit", 50) - self.federation_rc_concurrent = config.get("federation_rc_concurrent", 3) - self.federation_rr_transactions_per_room_per_second = config.get( - "federation_rr_transactions_per_room_per_second", 50, + "federation_rr_transactions_per_room_per_second", 50 ) def default_config(self, **kwargs): return """\ ## Ratelimiting ## - # Number of messages a client can send per second - # - #rc_messages_per_second: 0.2 - - # Number of message a client can send before being throttled - # - #rc_message_burst_count: 10.0 - - # Ratelimiting settings for registration and login. + # Ratelimiting settings for client actions (registration, login, messaging). # # Each ratelimiting configuration is made of two parameters: # - per_second: number of requests a client can send per second. # - burst_count: number of requests a client can send before being throttled. # # Synapse currently uses the following configurations: + # - one for messages that ratelimits sending based on the account the client + # is using # - one for registration that ratelimits registration requests based on the # client's IP address. # - one for login that ratelimits login requests based on the client's IP @@ -77,6 +105,10 @@ class RatelimitConfig(Config): # # The defaults are as shown below. # + #rc_message: + # per_second: 0.2 + # burst_count: 10 + # #rc_registration: # per_second: 0.17 # burst_count: 3 @@ -92,29 +124,28 @@ class RatelimitConfig(Config): # per_second: 0.17 # burst_count: 3 - # The federation window size in milliseconds - # - #federation_rc_window_size: 1000 - - # The number of federation requests from a single server in a window - # before the server will delay processing the request. - # - #federation_rc_sleep_limit: 10 - # The duration in milliseconds to delay processing events from - # remote servers by if they go over the sleep limit. + # Ratelimiting settings for incoming federation # - #federation_rc_sleep_delay: 500 - - # The maximum number of concurrent federation requests allowed - # from a single server + # The rc_federation configuration is made up of the following settings: + # - window_size: window size in milliseconds + # - sleep_limit: number of federation requests from a single server in + # a window before the server will delay processing the request. + # - sleep_delay: duration in milliseconds to delay processing events + # from remote servers by if they go over the sleep limit. + # - reject_limit: maximum number of concurrent federation requests + # allowed from a single server + # - concurrent: number of federation requests to concurrently process + # from a single server # - #federation_rc_reject_limit: 50 - - # The number of federation requests to concurrently process from a - # single server + # The defaults are as shown below. # - #federation_rc_concurrent: 3 + #rc_federation: + # window_size: 1000 + # sleep_limit: 10 + # sleep_delay: 500 + # reject_limit: 50 + # concurrent: 3 # Target outgoing federation transaction frequency for sending read-receipts, # per-room. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 07fccdd8f9..a5454556cc 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -19,7 +19,10 @@ from six import string_types from frozendict import frozendict +from twisted.internet import defer + from synapse.api.constants import EventTypes +from synapse.util.async_helpers import yieldable_gather_results from . import EventBase @@ -311,3 +314,44 @@ def serialize_event(e, time_now_ms, as_client_event=True, d = only_fields(d, only_event_fields) return d + + +class EventClientSerializer(object): + """Serializes events that are to be sent to clients. + + This is used for bundling extra information with any events to be sent to + clients. + """ + + def __init__(self, hs): + pass + + def serialize_event(self, event, time_now, **kwargs): + """Serializes a single event. + + Args: + event (EventBase) + time_now (int): The current time in milliseconds + **kwargs: Arguments to pass to `serialize_event` + + Returns: + Deferred[dict]: The serialized event + """ + event = serialize_event(event, time_now, **kwargs) + return defer.succeed(event) + + def serialize_events(self, events, time_now, **kwargs): + """Serializes multiple events. + + Args: + event (iter[EventBase]) + time_now (int): The current time in milliseconds + **kwargs: Arguments to pass to `serialize_event` + + Returns: + Deferred[list[dict]]: The list of serialized events + """ + return yieldable_gather_results( + self.serialize_event, events, + time_now=time_now, **kwargs + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 9030eb18c5..385eda2dca 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -63,11 +63,7 @@ class TransportLayerServer(JsonResource): self.authenticator = Authenticator(hs) self.ratelimiter = FederationRateLimiter( self.clock, - window_size=hs.config.federation_rc_window_size, - sleep_limit=hs.config.federation_rc_sleep_limit, - sleep_msec=hs.config.federation_rc_sleep_delay, - reject_limit=hs.config.federation_rc_reject_limit, - concurrent_requests=hs.config.federation_rc_concurrent, + config=hs.config.rc_federation, ) self.register_servlets() diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ac09d03ba9..dca337ec61 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -90,8 +90,8 @@ class BaseHandler(object): messages_per_second = override.messages_per_second burst_count = override.burst_count else: - messages_per_second = self.hs.config.rc_messages_per_second - burst_count = self.hs.config.rc_message_burst_count + messages_per_second = self.hs.config.rc_message.per_second + burst_count = self.hs.config.rc_message.burst_count allowed, time_allowed = self.ratelimiter.can_do_action( user_id, time_now, diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1b4d8c74ae..6003ad9cca 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -21,7 +21,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase -from synapse.events.utils import serialize_event from synapse.types import UserID from synapse.util.logutils import log_function from synapse.visibility import filter_events_for_client @@ -50,6 +49,7 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() self.state = hs.get_state_handler() self._server_notices_sender = hs.get_server_notices_sender() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks @log_function @@ -120,9 +120,9 @@ class EventStreamHandler(BaseHandler): time_now = self.clock.time_msec() - chunks = [ - serialize_event(e, time_now, as_client_event) for e in events - ] + chunks = yield self._event_serializer.serialize_events( + events, time_now, as_client_event=as_client_event, + ) chunk = { "chunk": chunks, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 7dfae78db0..aaee5db0b7 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -19,7 +19,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state from synapse.streams.config import PaginationConfig @@ -43,6 +42,7 @@ class InitialSyncHandler(BaseHandler): self.clock = hs.get_clock() self.validator = EventValidator() self.snapshot_cache = SnapshotCache() + self._event_serializer = hs.get_event_client_serializer() def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True, include_archived=False): @@ -138,7 +138,9 @@ class InitialSyncHandler(BaseHandler): d["inviter"] = event.sender invite_event = yield self.store.get_event(event.event_id) - d["invite"] = serialize_event(invite_event, time_now, as_client_event) + d["invite"] = yield self._event_serializer.serialize_event( + invite_event, time_now, as_client_event, + ) rooms_ret.append(d) @@ -185,18 +187,21 @@ class InitialSyncHandler(BaseHandler): time_now = self.clock.time_msec() d["messages"] = { - "chunk": [ - serialize_event(m, time_now, as_client_event) - for m in messages - ], + "chunk": ( + yield self._event_serializer.serialize_events( + messages, time_now=time_now, + as_client_event=as_client_event, + ) + ), "start": start_token.to_string(), "end": end_token.to_string(), } - d["state"] = [ - serialize_event(c, time_now, as_client_event) - for c in current_state.values() - ] + d["state"] = yield self._event_serializer.serialize_events( + current_state.values(), + time_now=time_now, + as_client_event=as_client_event + ) account_data_events = [] tags = tags_by_room.get(event.room_id) @@ -337,11 +342,15 @@ class InitialSyncHandler(BaseHandler): "membership": membership, "room_id": room_id, "messages": { - "chunk": [serialize_event(m, time_now) for m in messages], + "chunk": (yield self._event_serializer.serialize_events( + messages, time_now, + )), "start": start_token.to_string(), "end": end_token.to_string(), }, - "state": [serialize_event(s, time_now) for s in room_state.values()], + "state": (yield self._event_serializer.serialize_events( + room_state.values(), time_now, + )), "presence": [], "receipts": [], }) @@ -355,10 +364,9 @@ class InitialSyncHandler(BaseHandler): # TODO: These concurrently time_now = self.clock.time_msec() - state = [ - serialize_event(x, time_now) - for x in current_state.values() - ] + state = yield self._event_serializer.serialize_events( + current_state.values(), time_now, + ) now_token = yield self.hs.get_event_sources().get_current_token() @@ -425,7 +433,9 @@ class InitialSyncHandler(BaseHandler): ret = { "room_id": room_id, "messages": { - "chunk": [serialize_event(m, time_now) for m in messages], + "chunk": (yield self._event_serializer.serialize_events( + messages, time_now, + )), "start": start_token.to_string(), "end": end_token.to_string(), }, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e5afeadf68..7b2c33a922 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -32,7 +32,6 @@ from synapse.api.errors import ( ) from synapse.api.room_versions import RoomVersions from synapse.api.urls import ConsentURIBuilder -from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.state import StateFilter @@ -57,6 +56,7 @@ class MessageHandler(object): self.clock = hs.get_clock() self.state = hs.get_state_handler() self.store = hs.get_datastore() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, @@ -164,9 +164,10 @@ class MessageHandler(object): room_state = room_state[membership_event_id] now = self.clock.time_msec() - defer.returnValue( - [serialize_event(c, now) for c in room_state.values()] + events = yield self._event_serializer.serialize_events( + room_state.values(), now, ) + defer.returnValue(events) @defer.inlineCallbacks def get_joined_members(self, requester, room_id): diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e4fdae9266..8f811e24fe 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -20,7 +20,6 @@ from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError -from synapse.events.utils import serialize_event from synapse.storage.state import StateFilter from synapse.types import RoomStreamToken from synapse.util.async_helpers import ReadWriteLock @@ -78,6 +77,7 @@ class PaginationHandler(object): self._purges_in_progress_by_room = set() # map from purge id to PurgeStatus self._purges_by_id = {} + self._event_serializer = hs.get_event_client_serializer() def start_purge_history(self, room_id, token, delete_local_events=False): @@ -278,18 +278,22 @@ class PaginationHandler(object): time_now = self.clock.time_msec() chunk = { - "chunk": [ - serialize_event(e, time_now, as_client_event) - for e in events - ], + "chunk": ( + yield self._event_serializer.serialize_events( + events, time_now, + as_client_event=as_client_event, + ) + ), "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), } if state: - chunk["state"] = [ - serialize_event(e, time_now, as_client_event) - for e in state - ] + chunk["state"] = ( + yield self._event_serializer.serialize_events( + state, time_now, + as_client_event=as_client_event, + ) + ) defer.returnValue(chunk) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 49c439313e..9bba74d6c9 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -23,7 +23,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter -from synapse.events.utils import serialize_event from synapse.storage.state import StateFilter from synapse.visibility import filter_events_for_client @@ -36,6 +35,7 @@ class SearchHandler(BaseHandler): def __init__(self, hs): super(SearchHandler, self).__init__(hs) + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def get_old_rooms_from_upgraded_room(self, room_id): @@ -401,14 +401,16 @@ class SearchHandler(BaseHandler): time_now = self.clock.time_msec() for context in contexts.values(): - context["events_before"] = [ - serialize_event(e, time_now) - for e in context["events_before"] - ] - context["events_after"] = [ - serialize_event(e, time_now) - for e in context["events_after"] - ] + context["events_before"] = ( + yield self._event_serializer.serialize_events( + context["events_before"], time_now, + ) + ) + context["events_after"] = ( + yield self._event_serializer.serialize_events( + context["events_after"], time_now, + ) + ) state_results = {} if include_state: @@ -422,14 +424,13 @@ class SearchHandler(BaseHandler): # We're now about to serialize the events. We should not make any # blocking calls after this. Otherwise the 'age' will be wrong - results = [ - { + results = [] + for e in allowed_events: + results.append({ "rank": rank_map[e.event_id], - "result": serialize_event(e, time_now), + "result": (yield self._event_serializer.serialize_event(e, time_now)), "context": contexts.get(e.event_id, {}), - } - for e in allowed_events - ] + }) rooms_cat_res = { "results": results, @@ -438,10 +439,13 @@ class SearchHandler(BaseHandler): } if state_results: - rooms_cat_res["state"] = { - room_id: [serialize_event(e, time_now) for e in state] - for room_id, state in state_results.items() - } + s = {} + for room_id, state in state_results.items(): + s[room_id] = yield self._event_serializer.serialize_events( + state, time_now, + ) + + rooms_cat_res["state"] = s if room_groups and "room_id" in group_keys: rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups diff --git a/synapse/rest/client/v1/base.py b/synapse/rest/client/v1/base.py index c77d7aba68..dc63b661c0 100644 --- a/synapse/rest/client/v1/base.py +++ b/synapse/rest/client/v1/base.py @@ -19,7 +19,7 @@ import logging import re -from synapse.api.urls import CLIENT_PREFIX +from synapse.api.urls import CLIENT_API_PREFIX from synapse.http.servlet import RestServlet from synapse.rest.client.transactions import HttpTransactionCache @@ -36,12 +36,12 @@ def client_path_patterns(path_regex, releases=(0,), include_in_unstable=True): Returns: SRE_Pattern """ - patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)] + patterns = [re.compile("^" + CLIENT_API_PREFIX + "/api/v1" + path_regex)] if include_in_unstable: - unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable") + unstable_prefix = CLIENT_API_PREFIX + "/unstable" patterns.append(re.compile("^" + unstable_prefix + path_regex)) for release in releases: - new_prefix = CLIENT_PREFIX.replace("/api/v1", "/r%d" % release) + new_prefix = CLIENT_API_PREFIX + "/r%d" % (release,) patterns.append(re.compile("^" + new_prefix + path_regex)) return patterns diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index cd9b3bdbd1..c3b0a39ab7 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -19,7 +19,6 @@ import logging from twisted.internet import defer from synapse.api.errors import SynapseError -from synapse.events.utils import serialize_event from synapse.streams.config import PaginationConfig from .base import ClientV1RestServlet, client_path_patterns @@ -84,6 +83,7 @@ class EventRestServlet(ClientV1RestServlet): super(EventRestServlet, self).__init__(hs) self.clock = hs.get_clock() self.event_handler = hs.get_event_handler() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request, event_id): @@ -92,7 +92,8 @@ class EventRestServlet(ClientV1RestServlet): time_now = self.clock.time_msec() if event: - defer.returnValue((200, serialize_event(event, time_now))) + event = yield self._event_serializer.serialize_event(event, time_now) + defer.returnValue((200, event)) else: defer.returnValue((404, "Event not found.")) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index fab04965cb..255a85c588 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -26,7 +26,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.filtering import Filter -from synapse.events.utils import format_event_for_client_v2, serialize_event +from synapse.events.utils import format_event_for_client_v2 from synapse.http.servlet import ( assert_params_in_dict, parse_integer, @@ -537,6 +537,7 @@ class RoomEventServlet(ClientV1RestServlet): super(RoomEventServlet, self).__init__(hs) self.clock = hs.get_clock() self.event_handler = hs.get_event_handler() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): @@ -545,7 +546,8 @@ class RoomEventServlet(ClientV1RestServlet): time_now = self.clock.time_msec() if event: - defer.returnValue((200, serialize_event(event, time_now))) + event = yield self._event_serializer.serialize_event(event, time_now) + defer.returnValue((200, event)) else: defer.returnValue((404, "Event not found.")) @@ -559,6 +561,7 @@ class RoomEventContextServlet(ClientV1RestServlet): super(RoomEventContextServlet, self).__init__(hs) self.clock = hs.get_clock() self.room_context_handler = hs.get_room_context_handler() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): @@ -588,16 +591,18 @@ class RoomEventContextServlet(ClientV1RestServlet): ) time_now = self.clock.time_msec() - results["events_before"] = [ - serialize_event(event, time_now) for event in results["events_before"] - ] - results["event"] = serialize_event(results["event"], time_now) - results["events_after"] = [ - serialize_event(event, time_now) for event in results["events_after"] - ] - results["state"] = [ - serialize_event(event, time_now) for event in results["state"] - ] + results["events_before"] = yield self._event_serializer.serialize_events( + results["events_before"], time_now, + ) + results["event"] = yield self._event_serializer.serialize_event( + results["event"], time_now, + ) + results["events_after"] = yield self._event_serializer.serialize_events( + results["events_after"], time_now, + ) + results["state"] = yield self._event_serializer.serialize_events( + results["state"], time_now, + ) defer.returnValue((200, results)) diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 77434937ff..24ac26bf03 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -21,13 +21,12 @@ import re from twisted.internet import defer from synapse.api.errors import InteractiveAuthIncompleteError -from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX +from synapse.api.urls import CLIENT_API_PREFIX logger = logging.getLogger(__name__) def client_v2_patterns(path_regex, releases=(0,), - v2_alpha=True, unstable=True): """Creates a regex compiled client path with the correct client path prefix. @@ -39,13 +38,11 @@ def client_v2_patterns(path_regex, releases=(0,), SRE_Pattern """ patterns = [] - if v2_alpha: - patterns.append(re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex)) if unstable: - unstable_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/unstable") + unstable_prefix = CLIENT_API_PREFIX + "/unstable" patterns.append(re.compile("^" + unstable_prefix + path_regex)) for release in releases: - new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release) + new_prefix = CLIENT_API_PREFIX + "/r%d" % (release,) patterns.append(re.compile("^" + new_prefix + path_regex)) return patterns diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index ac035c7735..4c380ab84d 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -19,7 +19,7 @@ from twisted.internet import defer from synapse.api.constants import LoginType from synapse.api.errors import SynapseError -from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX +from synapse.api.urls import CLIENT_API_PREFIX from synapse.http.server import finish_request from synapse.http.servlet import RestServlet, parse_string @@ -139,8 +139,8 @@ class AuthRestServlet(RestServlet): if stagetype == LoginType.RECAPTCHA: html = RECAPTCHA_TEMPLATE % { 'session': session, - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.RECAPTCHA + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.RECAPTCHA ), 'sitekey': self.hs.config.recaptcha_public_key, } @@ -159,8 +159,8 @@ class AuthRestServlet(RestServlet): self.hs.config.public_baseurl, self.hs.config.user_consent_version, ), - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.TERMS ), } html_bytes = html.encode("utf8") @@ -203,8 +203,8 @@ class AuthRestServlet(RestServlet): else: html = RECAPTCHA_TEMPLATE % { 'session': session, - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.RECAPTCHA + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.RECAPTCHA ), 'sitekey': self.hs.config.recaptcha_public_key, } @@ -240,8 +240,8 @@ class AuthRestServlet(RestServlet): self.hs.config.public_baseurl, self.hs.config.user_consent_version, ), - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.TERMS ), } html_bytes = html.encode("utf8") diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 9b75bb1377..5a5be7c390 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) class DevicesRestServlet(RestServlet): - PATTERNS = client_v2_patterns("/devices$", v2_alpha=False) + PATTERNS = client_v2_patterns("/devices$") def __init__(self, hs): """ @@ -56,7 +56,7 @@ class DeleteDevicesRestServlet(RestServlet): API for bulk deletion of devices. Accepts a JSON object with a devices key which lists the device_ids to delete. Requires user interactive auth. """ - PATTERNS = client_v2_patterns("/delete_devices", v2_alpha=False) + PATTERNS = client_v2_patterns("/delete_devices") def __init__(self, hs): super(DeleteDevicesRestServlet, self).__init__() @@ -95,7 +95,7 @@ class DeleteDevicesRestServlet(RestServlet): class DeviceRestServlet(RestServlet): - PATTERNS = client_v2_patterns("/devices/(?P<device_id>[^/]*)$", v2_alpha=False) + PATTERNS = client_v2_patterns("/devices/(?P<device_id>[^/]*)$") def __init__(self, hs): """ diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py index 2a6ea3df5f..0a1eb0ae45 100644 --- a/synapse/rest/client/v2_alpha/notifications.py +++ b/synapse/rest/client/v2_alpha/notifications.py @@ -17,10 +17,7 @@ import logging from twisted.internet import defer -from synapse.events.utils import ( - format_event_for_client_v2_without_room_id, - serialize_event, -) +from synapse.events.utils import format_event_for_client_v2_without_room_id from synapse.http.servlet import RestServlet, parse_integer, parse_string from ._base import client_v2_patterns @@ -36,6 +33,7 @@ class NotificationsServlet(RestServlet): self.store = hs.get_datastore() self.auth = hs.get_auth() self.clock = hs.get_clock() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request): @@ -69,11 +67,11 @@ class NotificationsServlet(RestServlet): "profile_tag": pa["profile_tag"], "actions": pa["actions"], "ts": pa["received_ts"], - "event": serialize_event( + "event": (yield self._event_serializer.serialize_event( notif_events[pa["event_id"]], self.clock.time_msec(), event_format=format_event_for_client_v2_without_room_id, - ), + )), } if pa["room_id"] not in receipts_by_room: diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index ecec610859..fa0cedb8d4 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -31,6 +31,7 @@ from synapse.api.errors import ( SynapseError, UnrecognizedRequestError, ) +from synapse.config.ratelimiting import FederationRateLimitConfig from synapse.config.server import is_threepid_reserved from synapse.http.servlet import ( RestServlet, @@ -153,16 +154,18 @@ class UsernameAvailabilityRestServlet(RestServlet): self.registration_handler = hs.get_registration_handler() self.ratelimiter = FederationRateLimiter( hs.get_clock(), - # Time window of 2s - window_size=2000, - # Artificially delay requests if rate > sleep_limit/window_size - sleep_limit=1, - # Amount of artificial delay to apply - sleep_msec=1000, - # Error with 429 if more than reject_limit requests are queued - reject_limit=1, - # Allow 1 request at a time - concurrent_requests=1, + FederationRateLimitConfig( + # Time window of 2s + window_size=2000, + # Artificially delay requests if rate > sleep_limit/window_size + sleep_limit=1, + # Amount of artificial delay to apply + sleep_msec=1000, + # Error with 429 if more than reject_limit requests are queued + reject_limit=1, + # Allow 1 request at a time + concurrent_requests=1, + ) ) @defer.inlineCallbacks diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py index 3db7ff8d1b..62b8de71fa 100644 --- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py @@ -50,7 +50,6 @@ class RoomUpgradeRestServlet(RestServlet): PATTERNS = client_v2_patterns( # /rooms/$roomid/upgrade "/rooms/(?P<room_id>[^/]*)/upgrade$", - v2_alpha=False, ) def __init__(self, hs): diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index a9e9a47a0b..21e9cef2d0 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -29,7 +29,6 @@ logger = logging.getLogger(__name__) class SendToDeviceRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns( "/sendToDevice/(?P<message_type>[^/]*)/(?P<txn_id>[^/]*)$", - v2_alpha=False ) def __init__(self, hs): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 39d157a44b..c701e534e7 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -26,7 +26,6 @@ from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection from synapse.events.utils import ( format_event_for_client_v2_without_room_id, format_event_raw, - serialize_event, ) from synapse.handlers.presence import format_user_presence_state from synapse.handlers.sync import SyncConfig @@ -86,6 +85,7 @@ class SyncRestServlet(RestServlet): self.filtering = hs.get_filtering() self.presence_handler = hs.get_presence_handler() self._server_notices_sender = hs.get_server_notices_sender() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request): @@ -168,14 +168,14 @@ class SyncRestServlet(RestServlet): ) time_now = self.clock.time_msec() - response_content = self.encode_response( + response_content = yield self.encode_response( time_now, sync_result, requester.access_token_id, filter ) defer.returnValue((200, response_content)) - @staticmethod - def encode_response(time_now, sync_result, access_token_id, filter): + @defer.inlineCallbacks + def encode_response(self, time_now, sync_result, access_token_id, filter): if filter.event_format == 'client': event_formatter = format_event_for_client_v2_without_room_id elif filter.event_format == 'federation': @@ -183,24 +183,24 @@ class SyncRestServlet(RestServlet): else: raise Exception("Unknown event format %s" % (filter.event_format, )) - joined = SyncRestServlet.encode_joined( + joined = yield self.encode_joined( sync_result.joined, time_now, access_token_id, filter.event_fields, event_formatter, ) - invited = SyncRestServlet.encode_invited( + invited = yield self.encode_invited( sync_result.invited, time_now, access_token_id, event_formatter, ) - archived = SyncRestServlet.encode_archived( + archived = yield self.encode_archived( sync_result.archived, time_now, access_token_id, filter.event_fields, event_formatter, ) - return { + defer.returnValue({ "account_data": {"events": sync_result.account_data}, "to_device": {"events": sync_result.to_device}, "device_lists": { @@ -222,7 +222,7 @@ class SyncRestServlet(RestServlet): }, "device_one_time_keys_count": sync_result.device_one_time_keys_count, "next_batch": sync_result.next_batch.to_string(), - } + }) @staticmethod def encode_presence(events, time_now): @@ -239,8 +239,8 @@ class SyncRestServlet(RestServlet): ] } - @staticmethod - def encode_joined(rooms, time_now, token_id, event_fields, event_formatter): + @defer.inlineCallbacks + def encode_joined(self, rooms, time_now, token_id, event_fields, event_formatter): """ Encode the joined rooms in a sync result @@ -261,15 +261,15 @@ class SyncRestServlet(RestServlet): """ joined = {} for room in rooms: - joined[room.room_id] = SyncRestServlet.encode_room( + joined[room.room_id] = yield self.encode_room( room, time_now, token_id, joined=True, only_fields=event_fields, event_formatter=event_formatter, ) - return joined + defer.returnValue(joined) - @staticmethod - def encode_invited(rooms, time_now, token_id, event_formatter): + @defer.inlineCallbacks + def encode_invited(self, rooms, time_now, token_id, event_formatter): """ Encode the invited rooms in a sync result @@ -289,7 +289,7 @@ class SyncRestServlet(RestServlet): """ invited = {} for room in rooms: - invite = serialize_event( + invite = yield self._event_serializer.serialize_event( room.invite, time_now, token_id=token_id, event_format=event_formatter, is_invite=True, @@ -302,10 +302,10 @@ class SyncRestServlet(RestServlet): "invite_state": {"events": invited_state} } - return invited + defer.returnValue(invited) - @staticmethod - def encode_archived(rooms, time_now, token_id, event_fields, event_formatter): + @defer.inlineCallbacks + def encode_archived(self, rooms, time_now, token_id, event_fields, event_formatter): """ Encode the archived rooms in a sync result @@ -326,17 +326,17 @@ class SyncRestServlet(RestServlet): """ joined = {} for room in rooms: - joined[room.room_id] = SyncRestServlet.encode_room( + joined[room.room_id] = yield self.encode_room( room, time_now, token_id, joined=False, only_fields=event_fields, event_formatter=event_formatter, ) - return joined + defer.returnValue(joined) - @staticmethod + @defer.inlineCallbacks def encode_room( - room, time_now, token_id, joined, + self, room, time_now, token_id, joined, only_fields, event_formatter, ): """ @@ -355,9 +355,10 @@ class SyncRestServlet(RestServlet): Returns: dict[str, object]: the room, encoded in our response format """ - def serialize(event): - return serialize_event( - event, time_now, token_id=token_id, + def serialize(events): + return self._event_serializer.serialize_events( + events, time_now=time_now, + token_id=token_id, event_format=event_formatter, only_event_fields=only_fields, ) @@ -376,8 +377,8 @@ class SyncRestServlet(RestServlet): event.event_id, room.room_id, event.room_id, ) - serialized_state = [serialize(e) for e in state_events] - serialized_timeline = [serialize(e) for e in timeline_events] + serialized_state = yield serialize(state_events) + serialized_timeline = yield serialize(timeline_events) account_data = room.account_data @@ -397,7 +398,7 @@ class SyncRestServlet(RestServlet): result["unread_notifications"] = room.unread_notifications result["summary"] = room.summary - return result + defer.returnValue(result) def register_servlets(hs, http_server): diff --git a/synapse/server.py b/synapse/server.py index 8c30ac2fa5..80d40b9272 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -35,6 +35,7 @@ from synapse.crypto import context_factory from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory from synapse.events.spamcheck import SpamChecker +from synapse.events.utils import EventClientSerializer from synapse.federation.federation_client import FederationClient from synapse.federation.federation_server import ( FederationHandlerRegistry, @@ -185,6 +186,7 @@ class HomeServer(object): 'sendmail', 'registration_handler', 'account_validity_handler', + 'event_client_serializer', ] REQUIRED_ON_MASTER_STARTUP = [ @@ -511,6 +513,9 @@ class HomeServer(object): def build_account_validity_handler(self): return AccountValidityHandler(self) + def build_event_client_serializer(self): + return EventClientSerializer(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 6092f600ba..eb329ebd8b 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -302,7 +302,7 @@ class ApplicationServiceTransactionWorkerStore( event_ids = json.loads(entry["event_ids"]) - events = yield self._get_events(event_ids) + events = yield self.get_events_as_list(event_ids) defer.returnValue( AppServiceTransaction(service=service, id=entry["txn_id"], events=events) @@ -358,7 +358,7 @@ class ApplicationServiceTransactionWorkerStore( "get_new_events_for_appservice", get_new_events_for_appservice_txn ) - events = yield self._get_events(event_ids) + events = yield self.get_events_as_list(event_ids) defer.returnValue((upper_bound, events)) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 956f876572..09e39c2c28 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -45,7 +45,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ return self.get_auth_chain_ids( event_ids, include_given=include_given - ).addCallback(self._get_events) + ).addCallback(self.get_events_as_list) def get_auth_chain_ids(self, event_ids, include_given=False): """Get auth events for given event_ids. The events *must* be state events. @@ -316,7 +316,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_list, limit, ) - .addCallback(self._get_events) + .addCallback(self.get_events_as_list) .addCallback(lambda l: sorted(l, key=lambda e: -e.depth)) ) @@ -382,7 +382,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas latest_events, limit, ) - events = yield self._get_events(ids) + events = yield self.get_events_as_list(ids) defer.returnValue(events) def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 663991a9b6..adc6cf26b5 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -103,7 +103,7 @@ class EventsWorkerStore(SQLBaseStore): Returns: Deferred : A FrozenEvent. """ - events = yield self._get_events( + events = yield self.get_events_as_list( [event_id], check_redacted=check_redacted, get_prev_content=get_prev_content, @@ -142,7 +142,7 @@ class EventsWorkerStore(SQLBaseStore): Returns: Deferred : Dict from event_id to event. """ - events = yield self._get_events( + events = yield self.get_events_as_list( event_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, @@ -152,13 +152,32 @@ class EventsWorkerStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @defer.inlineCallbacks - def _get_events( + def get_events_as_list( self, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False, ): + """Get events from the database and return in a list in the same order + as given by `event_ids` arg. + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred[list[EventBase]]: List of events fetched from the database. The + events are in the same order as `event_ids` arg. + + Note that the returned list may be smaller than the list of event + IDs if not all events could be fetched. + """ + if not event_ids: defer.returnValue([]) @@ -202,21 +221,22 @@ class EventsWorkerStore(SQLBaseStore): # # The problem is that we end up at this point when an event # which has been redacted is pulled out of the database by - # _enqueue_events, because _enqueue_events needs to check the - # redaction before it can cache the redacted event. So obviously, - # calling get_event to get the redacted event out of the database - # gives us an infinite loop. + # _enqueue_events, because _enqueue_events needs to check + # the redaction before it can cache the redacted event. So + # obviously, calling get_event to get the redacted event out + # of the database gives us an infinite loop. # - # For now (quick hack to fix during 0.99 release cycle), we just - # go and fetch the relevant row from the db, but it would be nice - # to think about how we can cache this rather than hit the db - # every time we access a redaction event. + # For now (quick hack to fix during 0.99 release cycle), we + # just go and fetch the relevant row from the db, but it + # would be nice to think about how we can cache this rather + # than hit the db every time we access a redaction event. # # One thought on how to do this: - # 1. split _get_events up so that it is divided into (a) get the - # rawish event from the db/cache, (b) do the redaction/rejection - # filtering - # 2. have _get_event_from_row just call the first half of that + # 1. split get_events_as_list up so that it is divided into + # (a) get the rawish event from the db/cache, (b) do the + # redaction/rejection filtering + # 2. have _get_event_from_row just call the first half of + # that orig_sender = yield self._simple_select_one_onecol( table="events", diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 226f8f1b7e..ff49eaae02 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -460,7 +460,7 @@ class SearchStore(BackgroundUpdateStore): results = list(filter(lambda row: row["room_id"] in room_ids, results)) - events = yield self._get_events([r["event_id"] for r in results]) + events = yield self.get_events_as_list([r["event_id"] for r in results]) event_map = {ev.event_id: ev for ev in events} @@ -605,7 +605,7 @@ class SearchStore(BackgroundUpdateStore): results = list(filter(lambda row: row["room_id"] in room_ids, results)) - events = yield self._get_events([r["event_id"] for r in results]) + events = yield self.get_events_as_list([r["event_id"] for r in results]) event_map = {ev.event_id: ev for ev in events} diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 9cd1e0f9fe..d105b6b17d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -319,7 +319,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_room_events_stream_for_room", f) - ret = yield self._get_events([r.event_id for r in rows], get_prev_content=True) + ret = yield self.get_events_as_list([ + r.event_id for r in rows], get_prev_content=True, + ) self._set_before_and_after(ret, rows, topo_order=from_id is None) @@ -367,7 +369,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_membership_changes_for_user", f) - ret = yield self._get_events([r.event_id for r in rows], get_prev_content=True) + ret = yield self.get_events_as_list( + [r.event_id for r in rows], get_prev_content=True, + ) self._set_before_and_after(ret, rows, topo_order=False) @@ -394,7 +398,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) logger.debug("stream before") - events = yield self._get_events( + events = yield self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) logger.debug("stream after") @@ -580,11 +584,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event_filter, ) - events_before = yield self._get_events( + events_before = yield self.get_events_as_list( [e for e in results["before"]["event_ids"]], get_prev_content=True ) - events_after = yield self._get_events( + events_after = yield self.get_events_as_list( [e for e in results["after"]["event_ids"]], get_prev_content=True ) @@ -697,7 +701,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_all_new_events_stream", get_all_new_events_stream_txn ) - events = yield self._get_events(event_ids) + events = yield self.get_events_as_list(event_ids) defer.returnValue((upper_bound, events)) @@ -849,7 +853,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event_filter, ) - events = yield self._get_events( + events = yield self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 2f16f23d91..7253ba120f 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -156,6 +156,25 @@ def concurrently_execute(func, args, limit): ], consumeErrors=True)).addErrback(unwrapFirstError) +def yieldable_gather_results(func, iter, *args, **kwargs): + """Executes the function with each argument concurrently. + + Args: + func (func): Function to execute that returns a Deferred + iter (iter): An iterable that yields items that get passed as the first + argument to the function + *args: Arguments to be passed to each call to func + + Returns + Deferred[list]: Resolved when all functions have been invoked, or errors if + one of the function calls fails. + """ + return logcontext.make_deferred_yieldable(defer.gatherResults([ + run_in_background(func, item, *args, **kwargs) + for item in iter + ], consumeErrors=True)).addErrback(unwrapFirstError) + + class Linearizer(object): """Limits concurrent access to resources based on a key. Useful to ensure only a few things happen at a time on a given resource. diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 194da87639..e14c8bdfda 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -27,6 +27,7 @@ def user_left_room(distributor, user, room_id): distributor.fire("user_left_room", user=user, room_id=room_id) +# XXX: this is no longer used. We should probably kill it. def user_joined_room(distributor, user, room_id): distributor.fire("user_joined_room", user=user, room_id=room_id) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 7deb38f2a7..b146d137f4 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -30,31 +30,14 @@ logger = logging.getLogger(__name__) class FederationRateLimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): + def __init__(self, clock, config): """ Args: clock (Clock) - window_size (int): The window size in milliseconds. - sleep_limit (int): The number of requests received in the last - `window_size` milliseconds before we artificially start - delaying processing of requests. - sleep_msec (int): The number of milliseconds to delay processing - of incoming requests by. - reject_limit (int): The maximum number of requests that are can be - queued for processing before we start rejecting requests with - a 429 Too Many Requests response. - concurrent_requests (int): The number of concurrent requests to - process. + config (FederationRateLimitConfig) """ self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - + self._config = config self.ratelimiters = {} def ratelimit(self, host): @@ -76,25 +59,25 @@ class FederationRateLimiter(object): host, _PerHostRatelimiter( clock=self.clock, - window_size=self.window_size, - sleep_limit=self.sleep_limit, - sleep_msec=self.sleep_msec, - reject_limit=self.reject_limit, - concurrent_requests=self.concurrent_requests, + config=self._config, ) ).ratelimit() class _PerHostRatelimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): + def __init__(self, clock, config): + """ + Args: + clock (Clock) + config (FederationRateLimitConfig) + """ self.clock = clock - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_sec = sleep_msec / 1000.0 - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests + self.window_size = config.window_size + self.sleep_limit = config.sleep_limit + self.sleep_sec = config.sleep_delay / 1000.0 + self.reject_limit = config.reject_limit + self.concurrent_requests = config.concurrent # request_id objects for requests which have been slept self.sleeping_requests = set() diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 3f0083831b..25a6c89ef5 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -340,7 +340,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store._get_events = Mock(return_value=events) + self.store.get_events_as_list = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) diff --git a/tests/utils.py b/tests/utils.py index f21074ae28..200c1ceabe 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -134,10 +134,6 @@ def default_config(name, parse=False): "email_enable_notifs": False, "block_non_admin_invites": False, "federation_domain_whitelist": None, - "federation_rc_reject_limit": 10, - "federation_rc_sleep_limit": 10, - "federation_rc_sleep_delay": 100, - "federation_rc_concurrent": 10, "filter_timeline_limit": 5000, "user_directory_search_all_users": False, "user_consent_server_notice_content": None, @@ -156,6 +152,12 @@ def default_config(name, parse=False): "mau_stats_only": False, "mau_limits_reserved_threepids": [], "admin_contact": None, + "rc_federation": { + "reject_limit": 10, + "sleep_limit": 10, + "sleep_delay": 10, + "concurrent": 10, + }, "rc_message": {"per_second": 10000, "burst_count": 10000}, "rc_registration": {"per_second": 10000, "burst_count": 10000}, "rc_login": { @@ -374,12 +376,7 @@ def register_federation_servlets(hs, resource): resource=resource, authenticator=federation_server.Authenticator(hs), ratelimiter=FederationRateLimiter( - hs.get_clock(), - window_size=hs.config.federation_rc_window_size, - sleep_limit=hs.config.federation_rc_sleep_limit, - sleep_msec=hs.config.federation_rc_sleep_delay, - reject_limit=hs.config.federation_rc_reject_limit, - concurrent_requests=hs.config.federation_rc_concurrent, + hs.get_clock(), config=hs.config.rc_federation ), ) |