From 0cb169900ebd39b6f46dbff1b1909cc5b3c17493 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 11 Aug 2020 16:08:10 +0100 Subject: Implement login blocking based on SAML attributes (#8052) Hopefully this mostly speaks for itself. I also did a bit of cleaning up of the error handling. Fixes #8047 --- changelog.d/8052.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8052.feature (limited to 'changelog.d') diff --git a/changelog.d/8052.feature b/changelog.d/8052.feature new file mode 100644 index 0000000000..6aa020c764 --- /dev/null +++ b/changelog.d/8052.feature @@ -0,0 +1 @@ +Allow login to be blocked based on the values of SAML attributes. -- cgit 1.4.1 From db131b6b22350ea73128ac69ea82a48a6cf78a20 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 18:09:46 +0100 Subject: Change the default log config to reduce disk I/O and storage (#8040) * Change default log config to buffer by default. This batches up writes to the filesystem, which is more efficient for disk I/O. This means that it can take some time for logs to get written to disk. Note that ERROR logs (and above) immediately flush the buffer. This only effects new installs, as we only write the log config if started with `--generate-config` (in the same way we do for generating signing keys). * Default to keeping last 4 days of logs. This hopefully reduces the amount of logs kept for new servers. Keeping the last 1GB of logs is likely overkill for new servers, but equally may not be enough for busy ones. Instead, we keep the last four days worth of logs, enough so that admins can investigate any problems that happened over e.g. a long weekend. --- changelog.d/8040.misc | 1 + docs/sample_log_config.yaml | 41 ++++++++++++++++++++++++++++++++++++----- synapse/config/logger.py | 41 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 10 deletions(-) create mode 100644 changelog.d/8040.misc (limited to 'changelog.d') diff --git a/changelog.d/8040.misc b/changelog.d/8040.misc new file mode 100644 index 0000000000..a126151392 --- /dev/null +++ b/changelog.d/8040.misc @@ -0,0 +1 @@ +Change the default log config to reduce disk I/O and storage for new servers. diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml index 1a2739455e..403ac005ee 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml @@ -18,13 +18,29 @@ filters: handlers: file: - class: logging.handlers.RotatingFileHandler + class: logging.handlers.TimedRotatingFileHandler formatter: precise filename: /var/log/matrix-synapse/homeserver.log - maxBytes: 104857600 - backupCount: 10 - filters: [context] + when: midnight + backupCount: 3 # Does not include the current log file. encoding: utf8 + + # Default to buffering writes to log file for efficiency. This means that + # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR + # logs will still be flushed immediately. + buffer: + class: logging.handlers.MemoryHandler + filters: [context] + target: file + # The capacity is the number of log lines that are buffered before + # being written to disk. Increasing this will lead to better + # performance, at the expensive of it taking longer for log lines to + # be written to disk. + capacity: 10 + flushLevel: 30 # Flush for WARNING logs as well + + # A handler that writes logs to stderr. Unused by default, but can be used + # instead of "buffer" and "file" in the logger handlers. console: class: logging.StreamHandler formatter: precise @@ -36,8 +52,23 @@ loggers: # information such as access tokens. level: INFO + twisted: + # We send the twisted logging directly to the file handler, + # to work around https://github.com/matrix-org/synapse/issues/3471 + # when using "buffer" logger. Use "console" to log to stderr instead. + handlers: [file] + propagate: false + root: level: INFO - handlers: [file, console] + + # Write logs to the `buffer` handler, which will buffer them together in memory, + # then write them to a file. + # + # Replace "buffer" with "console" to log to stderr instead. (Note that you'll + # also need to update the configuation for the `twisted` logger above, in + # this case.) + # + handlers: [buffer] disable_existing_loggers: false diff --git a/synapse/config/logger.py b/synapse/config/logger.py index dd775a97e8..493e98462d 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -62,13 +62,29 @@ filters: handlers: file: - class: logging.handlers.RotatingFileHandler + class: logging.handlers.TimedRotatingFileHandler formatter: precise filename: ${log_file} - maxBytes: 104857600 - backupCount: 10 - filters: [context] + when: midnight + backupCount: 3 # Does not include the current log file. encoding: utf8 + + # Default to buffering writes to log file for efficiency. This means that + # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR + # logs will still be flushed immediately. + buffer: + class: logging.handlers.MemoryHandler + filters: [context] + target: file + # The capacity is the number of log lines that are buffered before + # being written to disk. Increasing this will lead to better + # performance, at the expensive of it taking longer for log lines to + # be written to disk. + capacity: 10 + flushLevel: 30 # Flush for WARNING logs as well + + # A handler that writes logs to stderr. Unused by default, but can be used + # instead of "buffer" and "file" in the logger handlers. console: class: logging.StreamHandler formatter: precise @@ -80,9 +96,24 @@ loggers: # information such as access tokens. level: INFO + twisted: + # We send the twisted logging directly to the file handler, + # to work around https://github.com/matrix-org/synapse/issues/3471 + # when using "buffer" logger. Use "console" to log to stderr instead. + handlers: [file] + propagate: false + root: level: INFO - handlers: [file, console] + + # Write logs to the `buffer` handler, which will buffer them together in memory, + # then write them to a file. + # + # Replace "buffer" with "console" to log to stderr instead. (Note that you'll + # also need to update the configuation for the `twisted` logger above, in + # this case.) + # + handlers: [buffer] disable_existing_loggers: false """ -- cgit 1.4.1 From a0f574f3c291374c11b7b1cac8fc7c4fbb6ef7b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 18:10:07 +0100 Subject: Reduce INFO logging (#8050) c.f. #8021 A lot of the code here is to change the `Completed 200 OK` logging to include the request URI so that we can drop the `Sending request...` log line. Some notes: 1. We won't log retries, which may be confusing considering the time taken log line includes retries and sleeps. 2. The `_send_request_with_optional_trailing_slash` will always be logged *without* the forward slash, even if it succeeded only with the forward slash. --- changelog.d/8050.misc | 1 + synapse/http/client.py | 2 +- synapse/http/federation/matrix_federation_agent.py | 2 +- synapse/http/matrixfederationclient.py | 94 ++++++++++++++++------ 4 files changed, 74 insertions(+), 25 deletions(-) create mode 100644 changelog.d/8050.misc (limited to 'changelog.d') diff --git a/changelog.d/8050.misc b/changelog.d/8050.misc new file mode 100644 index 0000000000..cc8d1af7fa --- /dev/null +++ b/changelog.d/8050.misc @@ -0,0 +1 @@ +Reduce amount of outbound request logging at INFO level. diff --git a/synapse/http/client.py b/synapse/http/client.py index 529532a063..8aeb70cdec 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -297,7 +297,7 @@ class SimpleHttpClient(object): outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) - logger.info("Sending request %s %s", method, redact_uri(uri)) + logger.debug("Sending request %s %s", method, redact_uri(uri)) with start_active_span( "outgoing-client-request", diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 0c02648015..369bf9c2fc 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -247,7 +247,7 @@ class MatrixHostnameEndpoint(object): port = server.port try: - logger.info("Connecting to %s:%i", host.decode("ascii"), port) + logger.debug("Connecting to %s:%i", host.decode("ascii"), port) endpoint = HostnameEndpoint(self._reactor, host, port) if self._tls_options: endpoint = wrapClientTLS(self._tls_options, endpoint) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2a6373937a..738be43f46 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -29,10 +29,11 @@ from zope.interface import implementer from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError -from twisted.internet.interfaces import IReactorPluggableNameResolver +from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers +from twisted.web.iweb import IResponse import synapse.metrics import synapse.util.retryutils @@ -74,7 +75,7 @@ MAXINT = sys.maxsize _next_id = 1 -@attr.s +@attr.s(frozen=True) class MatrixFederationRequest(object): method = attr.ib() """HTTP method @@ -110,26 +111,52 @@ class MatrixFederationRequest(object): :type: str|None """ + uri = attr.ib(init=False, type=bytes) + """The URI of this request + """ + def __attrs_post_init__(self): global _next_id - self.txn_id = "%s-O-%s" % (self.method, _next_id) + txn_id = "%s-O-%s" % (self.method, _next_id) _next_id = (_next_id + 1) % (MAXINT - 1) + object.__setattr__(self, "txn_id", txn_id) + + destination_bytes = self.destination.encode("ascii") + path_bytes = self.path.encode("ascii") + if self.query: + query_bytes = encode_query_args(self.query) + else: + query_bytes = b"" + + # The object is frozen so we can pre-compute this. + uri = urllib.parse.urlunparse( + (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ) + object.__setattr__(self, "uri", uri) + def get_json(self): if self.json_callback: return self.json_callback() return self.json -async def _handle_json_response(reactor, timeout_sec, request, response): +async def _handle_json_response( + reactor: IReactorTime, + timeout_sec: float, + request: MatrixFederationRequest, + response: IResponse, + start_ms: int, +): """ Reads the JSON body of a response, with a timeout Args: - reactor (IReactor): twisted reactor, for the timeout - timeout_sec (float): number of seconds to wait for response to complete - request (MatrixFederationRequest): the request that triggered the response - response (IResponse): response to the request + reactor: twisted reactor, for the timeout + timeout_sec: number of seconds to wait for response to complete + request: the request that triggered the response + response: response to the request + start_ms: Timestamp when request was made Returns: dict: parsed JSON response @@ -143,23 +170,35 @@ async def _handle_json_response(reactor, timeout_sec, request, response): body = await make_deferred_yieldable(d) except TimeoutError as e: logger.warning( - "{%s} [%s] Timed out reading response", request.txn_id, request.destination, + "{%s} [%s] Timed out reading response - %s %s", + request.txn_id, + request.destination, + request.method, + request.uri.decode("ascii"), ) raise RequestSendFailed(e, can_retry=True) from e except Exception as e: logger.warning( - "{%s} [%s] Error reading response: %s", + "{%s} [%s] Error reading response %s %s: %s", request.txn_id, request.destination, + request.method, + request.uri.decode("ascii"), e, ) raise + + time_taken_secs = reactor.seconds() - start_ms / 1000 + logger.info( - "{%s} [%s] Completed: %d %s", + "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), + time_taken_secs, + request.method, + request.uri.decode("ascii"), ) return body @@ -261,7 +300,9 @@ class MatrixFederationHttpClient(object): # 'M_UNRECOGNIZED' which some endpoints can return when omitting a # trailing slash on Synapse <= v0.99.3. logger.info("Retrying request with trailing slash") - request.path += "/" + + # Request is frozen so we create a new instance + request = attr.evolve(request, path=request.path + "/") response = await self._send_request(request, **send_request_args) @@ -373,9 +414,7 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url_bytes = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") - ) + url_bytes = request.uri url_str = url_bytes.decode("ascii") url_to_sign_bytes = urllib.parse.urlunparse( @@ -402,7 +441,7 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers - logger.info( + logger.debug( "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, @@ -436,7 +475,6 @@ class MatrixFederationHttpClient(object): except DNSLookupError as e: raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: - logger.info("Failed to send request: %s", e) raise RequestSendFailed(e, can_retry=True) from e incoming_responses_counter.labels( @@ -496,7 +534,7 @@ class MatrixFederationHttpClient(object): break except RequestSendFailed as e: - logger.warning( + logger.info( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, request.destination, @@ -654,6 +692,8 @@ class MatrixFederationHttpClient(object): json=data, ) + start_ms = self.clock.time_msec() + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, @@ -664,7 +704,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -720,6 +760,8 @@ class MatrixFederationHttpClient(object): method="POST", destination=destination, path=path, query=args, json=data ) + start_ms = self.clock.time_msec() + response = await self._send_request( request, long_retries=long_retries, @@ -733,7 +775,7 @@ class MatrixFederationHttpClient(object): _sec_timeout = self.default_timeout body = await _handle_json_response( - self.reactor, _sec_timeout, request, response + self.reactor, _sec_timeout, request, response, start_ms, ) return body @@ -786,6 +828,8 @@ class MatrixFederationHttpClient(object): method="GET", destination=destination, path=path, query=args ) + start_ms = self.clock.time_msec() + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, @@ -796,7 +840,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -846,6 +890,8 @@ class MatrixFederationHttpClient(object): method="DELETE", destination=destination, path=path, query=args ) + start_ms = self.clock.time_msec() + response = await self._send_request( request, long_retries=long_retries, @@ -854,7 +900,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -914,12 +960,14 @@ class MatrixFederationHttpClient(object): ) raise logger.info( - "{%s} [%s] Completed: %d %s [%d bytes]", + "{%s} [%s] Completed: %d %s [%d bytes] %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), length, + request.method, + request.uri.decode("ascii"), ) return (length, headers) -- cgit 1.4.1 From e1af09dccbb9d3a87728b8acb3b7425bba35a1fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2020 15:18:51 +0100 Subject: Newsfile --- changelog.d/8051.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8051.misc (limited to 'changelog.d') diff --git a/changelog.d/8051.misc b/changelog.d/8051.misc new file mode 100644 index 0000000000..7657b503c9 --- /dev/null +++ b/changelog.d/8051.misc @@ -0,0 +1 @@ +Remove the need to set `filters` in the logging config. This change is backwards compatible, as setting the filters explicitly still works. -- cgit 1.4.1 From a1e9bb9eae7f90e93e0ba02e84e8216e3c2f447a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 19:40:02 +0100 Subject: Add typing info to Notifier (#8058) --- changelog.d/8058.misc | 1 + synapse/handlers/events.py | 4 -- synapse/notifier.py | 131 ++++++++++++++++++++++++++++----------------- synapse/server.pyi | 6 +++ tox.ini | 1 + 5 files changed, 91 insertions(+), 52 deletions(-) create mode 100644 changelog.d/8058.misc (limited to 'changelog.d') diff --git a/changelog.d/8058.misc b/changelog.d/8058.misc new file mode 100644 index 0000000000..41a27e5d72 --- /dev/null +++ b/changelog.d/8058.misc @@ -0,0 +1 @@ +Add type hints to `Notifier`. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 71a89f09c7..1924636c4d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -57,13 +57,10 @@ class EventStreamHandler(BaseHandler): timeout=0, as_client_event=True, affect_presence=True, - only_keys=None, room_id=None, is_guest=False, ): """Fetches the events stream for a given user. - - If `only_keys` is not None, events from keys will be sent down. """ if room_id: @@ -93,7 +90,6 @@ class EventStreamHandler(BaseHandler): auth_user, pagin_config, timeout, - only_keys=only_keys, is_guest=is_guest, explicit_room_id=room_id, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 22ab4a9da5..694efe7116 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -15,7 +15,17 @@ import logging from collections import namedtuple -from typing import Callable, Iterable, List, TypeVar +from typing import ( + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -24,12 +34,14 @@ from twisted.internet import defer import synapse.server from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import StreamToken +from synapse.streams.config import PaginationConfig +from synapse.types import Collection, StreamToken, UserID from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -77,7 +89,13 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user_id, rooms, current_token, time_now_ms): + def __init__( + self, + user_id: str, + rooms: Collection[str], + current_token: StreamToken, + time_now_ms: int, + ): self.user_id = user_id self.rooms = set(rooms) self.current_token = current_token @@ -93,13 +111,13 @@ class _NotifierUserStream(object): with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) - def notify(self, stream_key, stream_id, time_now_ms): + def notify(self, stream_key: str, stream_id: int, time_now_ms: int): """Notify any listeners for this user of a new event from an event source. Args: - stream_key(str): The stream the event came from. - stream_id(str): The new id for the stream the event came from. - time_now_ms(int): The current time in milliseconds. + stream_key: The stream the event came from. + stream_id: The new id for the stream the event came from. + time_now_ms: The current time in milliseconds. """ self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) self.last_notified_token = self.current_token @@ -112,7 +130,7 @@ class _NotifierUserStream(object): self.notify_deferred = ObservableDeferred(defer.Deferred()) noify_deferred.callback(self.current_token) - def remove(self, notifier): + def remove(self, notifier: "Notifier"): """ Remove this listener from all the indexes in the Notifier it knows about. """ @@ -123,10 +141,10 @@ class _NotifierUserStream(object): notifier.user_to_user_stream.pop(self.user_id) - def count_listeners(self): + def count_listeners(self) -> int: return len(self.notify_deferred.observers()) - def new_listener(self, token): + def new_listener(self, token: StreamToken) -> _NotificationListener: """Returns a deferred that is resolved when there is a new token greater than the given token. @@ -159,14 +177,16 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 def __init__(self, hs: "synapse.server.HomeServer"): - self.user_to_user_stream = {} - self.room_to_user_streams = {} + self.user_to_user_stream = {} # type: Dict[str, _NotifierUserStream] + self.room_to_user_streams = {} # type: Dict[str, Set[_NotifierUserStream]] self.hs = hs self.storage = hs.get_storage() self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() - self.pending_new_room_events = [] + self.pending_new_room_events = ( + [] + ) # type: List[Tuple[int, EventBase, Collection[str]]] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -178,10 +198,9 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.federation_sender = None if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() - else: - self.federation_sender = None self.state_handler = hs.get_state_handler() @@ -193,12 +212,12 @@ class Notifier(object): # when rendering the metrics page, which is likely once per minute at # most when scraping it. def count_listeners(): - all_user_streams = set() + all_user_streams = set() # type: Set[_NotifierUserStream] - for x in list(self.room_to_user_streams.values()): - all_user_streams |= x - for x in list(self.user_to_user_stream.values()): - all_user_streams.add(x) + for streams in list(self.room_to_user_streams.values()): + all_user_streams |= streams + for stream in list(self.user_to_user_stream.values()): + all_user_streams.add(stream) return sum(stream.count_listeners() for stream in all_user_streams) @@ -223,7 +242,11 @@ class Notifier(object): self.replication_callbacks.append(cb) def on_new_room_event( - self, event, room_stream_id, max_room_stream_id, extra_users=[] + self, + event: EventBase, + room_stream_id: int, + max_room_stream_id: int, + extra_users: Collection[str] = [], ): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -241,11 +264,11 @@ class Notifier(object): self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_id): + def _notify_pending_new_room_events(self, max_room_stream_id: int): """Notify for the room events that were queued waiting for a previous event to be persisted. Args: - max_room_stream_id(int): The highest stream_id below which all + max_room_stream_id: The highest stream_id below which all events have been persisted. """ pending = self.pending_new_room_events @@ -258,7 +281,9 @@ class Notifier(object): else: self._on_new_room_event(event, room_stream_id, extra_users) - def _on_new_room_event(self, event, room_stream_id, extra_users=[]): + def _on_new_room_event( + self, event: EventBase, room_stream_id: int, extra_users: Collection[str] = [] + ): """Notify any user streams that are interested in this room event""" # poke any interested application service. run_as_background_process( @@ -275,13 +300,19 @@ class Notifier(object): "room_key", room_stream_id, users=extra_users, rooms=[event.room_id] ) - async def _notify_app_services(self, room_stream_id): + async def _notify_app_services(self, room_stream_id: int): try: await self.appservice_handler.notify_interested_services(room_stream_id) except Exception: logger.exception("Error notifying application services of event") - def on_new_event(self, stream_key, new_token, users=[], rooms=[]): + def on_new_event( + self, + stream_key: str, + new_token: int, + users: Collection[str] = [], + rooms: Collection[str] = [], + ): """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. @@ -307,14 +338,19 @@ class Notifier(object): self.notify_replication() - def on_new_replication_data(self): + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" self.notify_replication() async def wait_for_events( - self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START - ): + self, + user_id: str, + timeout: int, + callback: Callable[[StreamToken, StreamToken], Awaitable[T]], + room_ids=None, + from_token=StreamToken.START, + ) -> T: """Wait until the callback returns a non empty response or the timeout fires. """ @@ -377,19 +413,16 @@ class Notifier(object): async def get_events_for( self, - user, - pagination_config, - timeout, - only_keys=None, - is_guest=False, - explicit_room_id=None, - ): + user: UserID, + pagination_config: PaginationConfig, + timeout: int, + is_guest: bool = False, + explicit_room_id: str = None, + ) -> EventStreamResult: """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. - If `only_keys` is not None, events from keys will be sent down. - If explicit_room_id is not set, the user's joined rooms will be polled for events. If explicit_room_id is set, that room will be polled for events only if @@ -404,11 +437,13 @@ class Notifier(object): room_ids, is_joined = await self._get_room_ids(user, explicit_room_id) is_peeking = not is_joined - async def check_for_updates(before_token, after_token): + async def check_for_updates( + before_token: StreamToken, after_token: StreamToken + ) -> EventStreamResult: if not after_token.is_after(before_token): return EventStreamResult([], (from_token, from_token)) - events = [] + events = [] # type: List[EventBase] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -417,8 +452,6 @@ class Notifier(object): after_id = getattr(after_token, keyname) if before_id == after_id: continue - if only_keys and name not in only_keys: - continue new_events, new_key = await source.get_new_events( user=user, @@ -476,7 +509,9 @@ class Notifier(object): return result - async def _get_room_ids(self, user, explicit_room_id): + async def _get_room_ids( + self, user: UserID, explicit_room_id: Optional[str] + ) -> Tuple[Collection[str], bool]: joined_room_ids = await self.store.get_rooms_for_user(user.to_string()) if explicit_room_id: if explicit_room_id in joined_room_ids: @@ -486,7 +521,7 @@ class Notifier(object): raise AuthError(403, "Non-joined access not allowed") return joined_room_ids, True - async def _is_world_readable(self, room_id): + async def _is_world_readable(self, room_id: str) -> bool: state = await self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility, "" ) @@ -496,7 +531,7 @@ class Notifier(object): return False @log_function - def remove_expired_streams(self): + def remove_expired_streams(self) -> None: time_now_ms = self.clock.time_msec() expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS @@ -510,21 +545,21 @@ class Notifier(object): expired_stream.remove(self) @log_function - def _register_with_keys(self, user_stream): + def _register_with_keys(self, user_stream: _NotifierUserStream): self.user_to_user_stream[user_stream.user_id] = user_stream for room in user_stream.rooms: s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - def _user_joined_room(self, user_id, room_id): + def _user_joined_room(self, user_id: str, room_id: str): new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) - def notify_replication(self): + def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" for cb in self.replication_callbacks: cb() diff --git a/synapse/server.pyi b/synapse/server.pyi index 1aba408c21..6a9bbb4713 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -31,8 +31,10 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.typing import FollowerTypingHandler from synapse.replication.tcp.streams import Stream +from synapse.streams.events import EventSources class HomeServer(object): @property @@ -153,3 +155,7 @@ class HomeServer(object): pass def get_typing_handler(self) -> FollowerTypingHandler: pass + def get_event_sources(self) -> EventSources: + pass + def get_application_service_handler(self): + return ApplicationServicesHandler(self) diff --git a/tox.ini b/tox.ini index 9a052c1e33..54f28990ce 100644 --- a/tox.ini +++ b/tox.ini @@ -198,6 +198,7 @@ commands = mypy \ synapse/logging/ \ synapse/metrics \ synapse/module_api \ + synapse/notifier.py \ synapse/push/pusherpool.py \ synapse/push/push_rule_evaluator.py \ synapse/replication \ -- cgit 1.4.1 From 3c796e4159601d93df53000806f95bb42f9275a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 21:08:43 +0100 Subject: Update changelog.d/8051.misc Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/8051.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'changelog.d') diff --git a/changelog.d/8051.misc b/changelog.d/8051.misc index 7657b503c9..9e472cd481 100644 --- a/changelog.d/8051.misc +++ b/changelog.d/8051.misc @@ -1 +1 @@ -Remove the need to set `filters` in the logging config. This change is backwards compatible, as setting the filters explicitly still works. +It is no longer necessary to explicitly define `filters` in the logging configuration. (Continuing to do so is redundant but harmless.) -- cgit 1.4.1