diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index b23f8cc227..bc73982dc2 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.logging import opentracing as opentracing
+from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import StateMap, UserID
from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure
@@ -190,10 +191,6 @@ class Auth:
user_id, app_service = self._get_appservice_user_id(request)
if user_id:
- request.authenticated_entity = user_id
- opentracing.set_tag("authenticated_entity", user_id)
- opentracing.set_tag("appservice_id", app_service.id)
-
if ip_addr and self._track_appservice_user_ips:
await self.store.insert_client_ip(
user_id=user_id,
@@ -203,31 +200,38 @@ class Auth:
device_id="dummy-device", # stubbed
)
- return synapse.types.create_requester(user_id, app_service=app_service)
+ requester = synapse.types.create_requester(
+ user_id, app_service=app_service
+ )
+
+ request.requester = user_id
+ opentracing.set_tag("authenticated_entity", user_id)
+ opentracing.set_tag("user_id", user_id)
+ opentracing.set_tag("appservice_id", app_service.id)
+
+ return requester
user_info = await self.get_user_by_access_token(
access_token, rights, allow_expired=allow_expired
)
- user = user_info["user"]
- token_id = user_info["token_id"]
- is_guest = user_info["is_guest"]
- shadow_banned = user_info["shadow_banned"]
+ token_id = user_info.token_id
+ is_guest = user_info.is_guest
+ shadow_banned = user_info.shadow_banned
# Deny the request if the user account has expired.
if self._account_validity_enabled and not allow_expired:
- user_id = user.to_string()
- if await self.store.is_account_expired(user_id, self.clock.time_msec()):
+ if await self.store.is_account_expired(
+ user_info.user_id, self.clock.time_msec()
+ ):
raise AuthError(
403, "User account has expired", errcode=Codes.EXPIRED_ACCOUNT
)
- # device_id may not be present if get_user_by_access_token has been
- # stubbed out.
- device_id = user_info.get("device_id")
+ device_id = user_info.device_id
- if user and access_token and ip_addr:
+ if access_token and ip_addr:
await self.store.insert_client_ip(
- user_id=user.to_string(),
+ user_id=user_info.token_owner,
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
@@ -241,19 +245,23 @@ class Auth:
errcode=Codes.GUEST_ACCESS_FORBIDDEN,
)
- request.authenticated_entity = user.to_string()
- opentracing.set_tag("authenticated_entity", user.to_string())
- if device_id:
- opentracing.set_tag("device_id", device_id)
-
- return synapse.types.create_requester(
- user,
+ requester = synapse.types.create_requester(
+ user_info.user_id,
token_id,
is_guest,
shadow_banned,
device_id,
app_service=app_service,
+ authenticated_entity=user_info.token_owner,
)
+
+ request.requester = requester
+ opentracing.set_tag("authenticated_entity", user_info.token_owner)
+ opentracing.set_tag("user_id", user_info.user_id)
+ if device_id:
+ opentracing.set_tag("device_id", device_id)
+
+ return requester
except KeyError:
raise MissingClientTokenError()
@@ -289,7 +297,7 @@ class Auth:
async def get_user_by_access_token(
self, token: str, rights: str = "access", allow_expired: bool = False,
- ) -> dict:
+ ) -> TokenLookupResult:
""" Validate access token and get user_id from it
Args:
@@ -298,13 +306,7 @@ class Auth:
allow this
allow_expired: If False, raises an InvalidClientTokenError
if the token is expired
- Returns:
- dict that includes:
- `user` (UserID)
- `is_guest` (bool)
- `shadow_banned` (bool)
- `token_id` (int|None): access token id. May be None if guest
- `device_id` (str|None): device corresponding to access token
+
Raises:
InvalidClientTokenError if a user by that token exists, but the token is
expired
@@ -314,9 +316,9 @@ class Auth:
if rights == "access":
# first look in the database
- r = await self._look_up_user_by_access_token(token)
+ r = await self.store.get_user_by_access_token(token)
if r:
- valid_until_ms = r["valid_until_ms"]
+ valid_until_ms = r.valid_until_ms
if (
not allow_expired
and valid_until_ms is not None
@@ -333,7 +335,6 @@ class Auth:
# otherwise it needs to be a valid macaroon
try:
user_id, guest = self._parse_and_validate_macaroon(token, rights)
- user = UserID.from_string(user_id)
if rights == "access":
if not guest:
@@ -359,23 +360,17 @@ class Auth:
raise InvalidClientTokenError(
"Guest access token used for regular user"
)
- ret = {
- "user": user,
- "is_guest": True,
- "shadow_banned": False,
- "token_id": None,
+
+ ret = TokenLookupResult(
+ user_id=user_id,
+ is_guest=True,
# all guests get the same device id
- "device_id": GUEST_DEVICE_ID,
- }
+ device_id=GUEST_DEVICE_ID,
+ )
elif rights == "delete_pusher":
# We don't store these tokens in the database
- ret = {
- "user": user,
- "is_guest": False,
- "shadow_banned": False,
- "token_id": None,
- "device_id": None,
- }
+
+ ret = TokenLookupResult(user_id=user_id, is_guest=False)
else:
raise RuntimeError("Unknown rights setting %s", rights)
return ret
@@ -484,31 +479,15 @@ class Auth:
now = self.hs.get_clock().time_msec()
return now < expiry
- async def _look_up_user_by_access_token(self, token):
- ret = await self.store.get_user_by_access_token(token)
- if not ret:
- return None
-
- # we use ret.get() below because *lots* of unit tests stub out
- # get_user_by_access_token in a way where it only returns a couple of
- # the fields.
- user_info = {
- "user": UserID.from_string(ret.get("name")),
- "token_id": ret.get("token_id", None),
- "is_guest": False,
- "shadow_banned": ret.get("shadow_banned"),
- "device_id": ret.get("device_id"),
- "valid_until_ms": ret.get("valid_until_ms"),
- }
- return user_info
-
def get_appservice_by_req(self, request):
token = self.get_access_token_from_request(request)
service = self.store.get_app_service_by_token(token)
if not service:
logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError()
- request.authenticated_entity = service.sender
+ request.requester = synapse.types.create_requester(
+ service.sender, app_service=service
+ )
return service
async def is_server_admin(self, user: UserID) -> bool:
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 3862d9c08f..3944780a42 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Iterable, List, Match, Optional
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import _CacheContext, cached
if TYPE_CHECKING:
from synapse.appservice.api import ApplicationServiceApi
@@ -52,11 +52,11 @@ class ApplicationService:
self,
token,
hostname,
+ id,
+ sender,
url=None,
namespaces=None,
hs_token=None,
- sender=None,
- id=None,
protocols=None,
rate_limited=True,
ip_range_whitelist=None,
@@ -164,9 +164,9 @@ class ApplicationService:
does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match
- @cached(num_args=1)
+ @cached(num_args=1, cache_context=True)
async def matches_user_in_member_list(
- self, room_id: str, store: "DataStore"
+ self, room_id: str, store: "DataStore", cache_context: _CacheContext,
) -> bool:
"""Check if this service is interested a room based upon it's membership
@@ -177,7 +177,9 @@ class ApplicationService:
Returns:
True if this service would like to know about this room.
"""
- member_list = await store.get_users_in_room(room_id)
+ member_list = await store.get_users_in_room(
+ room_id, on_invalidate=cache_context.invalidate
+ )
# check joined member events
for user_id in member_list:
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 6b7be28aee..d4e887a3e0 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -23,7 +23,6 @@ from string import Template
import yaml
from twisted.logger import (
- ILogObserver,
LogBeginner,
STDLibLogObserver,
eventAsText,
@@ -32,11 +31,9 @@ from twisted.logger import (
import synapse
from synapse.app import _base as appbase
-from synapse.logging._structured import (
- reload_structured_logging,
- setup_structured_logging,
-)
+from synapse.logging._structured import setup_structured_logging
from synapse.logging.context import LoggingContextFilter
+from synapse.logging.filter import MetadataFilter
from synapse.util.versionstring import get_version_string
from ._base import Config, ConfigError
@@ -48,7 +45,11 @@ DEFAULT_LOG_CONFIG = Template(
# This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings.
#
+# Synapse also supports structured logging for machine readable logs which can
+# be ingested by ELK stacks. See [2] for details.
+#
# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
+# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md
version: 1
@@ -176,11 +177,11 @@ class LoggingConfig(Config):
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
-def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
+def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) -> None:
"""
- Set up Python stdlib logging.
+ Set up Python standard library logging.
"""
- if log_config is None:
+ if log_config_path is None:
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
" - %(message)s"
@@ -196,7 +197,8 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
handler.setFormatter(formatter)
logger.addHandler(handler)
else:
- logging.config.dictConfig(log_config)
+ # Load the logging configuration.
+ _load_logging_config(log_config_path)
# We add a log record factory that runs all messages through the
# LoggingContextFilter so that we get the context *at the time we log*
@@ -204,12 +206,14 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
# filter options, but care must when using e.g. MemoryHandler to buffer
# writes.
- log_filter = LoggingContextFilter(request="")
+ log_context_filter = LoggingContextFilter(request="")
+ log_metadata_filter = MetadataFilter({"server_name": config.server_name})
old_factory = logging.getLogRecordFactory()
def factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
- log_filter.filter(record)
+ log_context_filter.filter(record)
+ log_metadata_filter.filter(record)
return record
logging.setLogRecordFactory(factory)
@@ -255,21 +259,40 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
if not config.no_redirect_stdio:
print("Redirected stdout/stderr to logs")
- return observer
-
-def _reload_stdlib_logging(*args, log_config=None):
- logger = logging.getLogger("")
+def _load_logging_config(log_config_path: str) -> None:
+ """
+ Configure logging from a log config path.
+ """
+ with open(log_config_path, "rb") as f:
+ log_config = yaml.safe_load(f.read())
if not log_config:
- logger.warning("Reloaded a blank config?")
+ logging.warning("Loaded a blank logging config?")
+
+ # If the old structured logging configuration is being used, convert it to
+ # the new style configuration.
+ if "structured" in log_config and log_config.get("structured"):
+ log_config = setup_structured_logging(log_config)
logging.config.dictConfig(log_config)
+def _reload_logging_config(log_config_path):
+ """
+ Reload the log configuration from the file and apply it.
+ """
+ # If no log config path was given, it cannot be reloaded.
+ if log_config_path is None:
+ return
+
+ _load_logging_config(log_config_path)
+ logging.info("Reloaded log config from %s due to SIGHUP", log_config_path)
+
+
def setup_logging(
hs, config, use_worker_options=False, logBeginner: LogBeginner = globalLogBeginner
-) -> ILogObserver:
+) -> None:
"""
Set up the logging subsystem.
@@ -282,41 +305,18 @@ def setup_logging(
logBeginner: The Twisted logBeginner to use.
- Returns:
- The "root" Twisted Logger observer, suitable for sending logs to from a
- Logger instance.
"""
- log_config = config.worker_log_config if use_worker_options else config.log_config
-
- def read_config(*args, callback=None):
- if log_config is None:
- return None
-
- with open(log_config, "rb") as f:
- log_config_body = yaml.safe_load(f.read())
-
- if callback:
- callback(log_config=log_config_body)
- logging.info("Reloaded log config from %s due to SIGHUP", log_config)
-
- return log_config_body
+ log_config_path = (
+ config.worker_log_config if use_worker_options else config.log_config
+ )
- log_config_body = read_config()
+ # Perform one-time logging configuration.
+ _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
+ # Add a SIGHUP handler to reload the logging configuration, if one is available.
+ appbase.register_sighup(_reload_logging_config, log_config_path)
- if log_config_body and log_config_body.get("structured") is True:
- logger = setup_structured_logging(
- hs, config, log_config_body, logBeginner=logBeginner
- )
- appbase.register_sighup(read_config, callback=reload_structured_logging)
- else:
- logger = _setup_stdlib_logging(config, log_config_body, logBeginner=logBeginner)
- appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
-
- # make sure that the first thing we log is a thing we can grep backwards
- # for
+ # Log immediately so we can grep backwards.
logging.warning("***** STARTING SERVER *****")
logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)
logging.info("Instance name: %s", hs.get_instance_name())
-
- return logger
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index e203206865..8028663fa8 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -368,7 +368,7 @@ class FrozenEvent(EventBase):
return self.__repr__()
def __repr__(self):
- return "<FrozenEvent event_id='%s', type='%s', state_key='%s'>" % (
+ return "<FrozenEvent event_id=%r, type=%r, state_key=%r>" % (
self.get("event_id", None),
self.get("type", None),
self.get("state_key", None),
@@ -451,7 +451,7 @@ class FrozenEventV2(EventBase):
return self.__repr__()
def __repr__(self):
- return "<%s event_id='%s', type='%s', state_key='%s'>" % (
+ return "<%s event_id=%r, type=%r, state_key=%r>" % (
self.__class__.__name__,
self.event_id,
self.get("type", None),
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 7b4baddbf8..1f74c504a3 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -155,7 +155,7 @@ class Authenticator:
)
logger.debug("Request from %s", origin)
- request.authenticated_entity = origin
+ request.requester = origin
# If we get a valid signed request from the other side, its probably
# alive
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 2ed6d7d248..ce97fa70d7 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -299,7 +299,7 @@ class AccountValidityHandler:
self,
user_id: str,
expiration_ts: Optional[int] = None,
- email_sent: Optional[bool] = False,
+ email_sent: bool = False,
renewal_token: Optional[str] = None,
) -> int:
"""Renews the account attached to a given user by pushing back the
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 276594f3d9..ff103cbb92 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -991,17 +991,17 @@ class AuthHandler(BaseHandler):
# This might return an awaitable, if it does block the log out
# until it completes.
result = provider.on_logged_out(
- user_id=str(user_info["user"]),
- device_id=user_info["device_id"],
+ user_id=user_info.user_id,
+ device_id=user_info.device_id,
access_token=access_token,
)
if inspect.isawaitable(result):
await result
# delete pushers associated with this access token
- if user_info["token_id"] is not None:
+ if user_info.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
- str(user_info["user"]), (user_info["token_id"],)
+ user_info.user_id, (user_info.token_id,)
)
async def delete_access_tokens_for_user(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 49a00eed9c..8e014c9bb5 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -48,7 +48,7 @@ from synapse.util.wheel_timer import WheelTimer
MYPY = False
if MYPY:
- import synapse.server
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -101,7 +101,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class BasePresenceHandler(abc.ABC):
"""Parts of the PresenceHandler that are shared between workers and master"""
- def __init__(self, hs: "synapse.server.HomeServer"):
+ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
@@ -199,7 +199,7 @@ class BasePresenceHandler(abc.ABC):
class PresenceHandler(BasePresenceHandler):
- def __init__(self, hs: "synapse.server.HomeServer"):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
@@ -1011,7 +1011,7 @@ def format_user_presence_state(state, now, include_user_id=True):
class PresenceEventSource:
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
# We can't call get_presence_handler here because there's a cycle:
#
# Presence -> Notifier -> PresenceEventSource -> Presence
@@ -1071,12 +1071,14 @@ class PresenceEventSource:
users_interested_in = await self._get_interested_in(user, explicit_room_id)
- user_ids_changed = set()
+ user_ids_changed = set() # type: Collection[str]
changed = None
if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
if changed is not None and len(changed) < 500:
+ assert isinstance(user_ids_changed, set)
+
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
get_updates_counter.labels("stream").inc()
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c9a3a4a592..ab468c7d92 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -274,7 +274,7 @@ class ProfileHandler(BaseHandler):
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this homeserver")
- if not by_admin and requester and target_user != requester.user:
+ if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
if not by_admin and not self.hs.config.enable_set_displayname:
@@ -306,12 +306,6 @@ class ProfileHandler(BaseHandler):
else:
new_batchnum = None
- # If the admin changes the display name of a user, the requesting user cannot send
- # the join event to update the displayname in the rooms.
- # This must be done by the target user himself.
- if by_admin:
- requester = create_requester(target_user)
-
await self.store.set_profile_displayname(
target_user.localpart, displayname_to_set, new_batchnum
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index ae6a0b76d1..59ad075a4b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -130,7 +130,10 @@ class RegistrationHandler(BaseHandler):
# Retrieve guest user information from provided access token
user_data = await self.auth.get_user_by_access_token(guest_access_token)
- if not user_data["is_guest"] or user_data["user"].localpart != localpart:
+ if (
+ not user_data.is_guest
+ or UserID.from_string(user_data.user_id).localpart != localpart
+ ):
raise AuthError(
403,
"Cannot register taken user ID without valid guest "
@@ -239,8 +242,9 @@ class RegistrationHandler(BaseHandler):
)
if default_display_name:
+ requester = create_requester(user)
await self.profile_handler.set_displayname(
- user, None, default_display_name, by_admin=True
+ user, requester, default_display_name, by_admin=True
)
if self.hs.config.user_directory_search_all_users:
@@ -274,8 +278,9 @@ class RegistrationHandler(BaseHandler):
shadow_banned=shadow_banned,
)
+ requester = create_requester(user)
await self.profile_handler.set_displayname(
- user, None, default_display_name, by_admin=True
+ user, requester, default_display_name, by_admin=True
)
# Successfully registered
@@ -542,8 +547,9 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=display_name,
)
+ requester = create_requester(user)
await self.profile_handler.set_displayname(
- user, None, display_name, by_admin=True
+ user, requester, display_name, by_admin=True
)
if self.hs.config.user_directory_search_all_users:
@@ -858,7 +864,7 @@ class RegistrationHandler(BaseHandler):
# up when the access token is saved, but that's quite an
# invasive change I'd rather do separately.
user_tuple = await self.store.get_user_by_access_token(token)
- token_id = user_tuple["token_id"]
+ token_id = user_tuple.token_id
await self.pusher_pool.add_pusher(
user_id=user_id,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6457d40672..43472b3288 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -789,23 +789,30 @@ class RoomCreationHandler(BaseHandler):
ratelimit=False,
)
- for invitee in invite_list:
+ # we avoid dropping the lock between invites, as otherwise joins can
+ # start coming in and making the createRoom slow.
+ #
+ # we also don't need to check the requester's shadow-ban here, as we
+ # have already done so above (and potentially emptied invite_list).
+ with (await self.room_member_handler.member_linearizer.queue((room_id,))):
content = {}
is_direct = config.get("is_direct", None)
if is_direct:
content["is_direct"] = is_direct
- # Note that update_membership with an action of "invite" can raise a
- # ShadowBanError, but this was handled above by emptying invite_list.
- _, last_stream_id = await self.room_member_handler.update_membership(
- requester,
- UserID.from_string(invitee),
- room_id,
- "invite",
- ratelimit=False,
- content=content,
- new_room=True,
- )
+ for invitee in invite_list:
+ (
+ _,
+ last_stream_id,
+ ) = await self.room_member_handler.update_membership_locked(
+ requester,
+ UserID.from_string(invitee),
+ room_id,
+ "invite",
+ ratelimit=False,
+ content=content,
+ new_room=True,
+ )
for invite_3pid in invite_3pid_list:
id_server = invite_3pid["id_server"]
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index bcdd0fbdd6..da2cd6dff2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -308,7 +308,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
key = (room_id,)
with (await self.member_linearizer.queue(key)):
- result = await self._update_membership(
+ result = await self.update_membership_locked(
requester,
target,
room_id,
@@ -324,7 +324,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
return result
- async def _update_membership(
+ async def update_membership_locked(
self,
requester: Requester,
target: UserID,
@@ -338,6 +338,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room: bool = False,
require_consent: bool = True,
) -> Tuple[str, int]:
+ """Helper for update_membership.
+
+ Assumes that the membership linearizer is already held for the room.
+ """
content_specified = bool(content)
if content is None:
content = {}
diff --git a/synapse/http/site.py b/synapse/http/site.py
index ddb1770b09..5f0581dc3f 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,7 +14,7 @@
import contextlib
import logging
import time
-from typing import Optional
+from typing import Optional, Union
from twisted.python.failure import Failure
from twisted.web.server import Request, Site
@@ -23,6 +23,7 @@ from synapse.config.server import ListenerConfig
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.logging.context import LoggingContext, PreserveLoggingContext
+from synapse.types import Requester
logger = logging.getLogger(__name__)
@@ -54,9 +55,12 @@ class SynapseRequest(Request):
Request.__init__(self, channel, *args, **kw)
self.site = channel.site
self._channel = channel # this is used by the tests
- self.authenticated_entity = None
self.start_time = 0.0
+ # The requester, if authenticated. For federation requests this is the
+ # server name, for client requests this is the Requester object.
+ self.requester = None # type: Optional[Union[Requester, str]]
+
# we can't yet create the logcontext, as we don't know the method.
self.logcontext = None # type: Optional[LoggingContext]
@@ -271,11 +275,23 @@ class SynapseRequest(Request):
# to the client (nb may be negative)
response_send_time = self.finish_time - self._processing_finished_time
- # need to decode as it could be raw utf-8 bytes
- # from a IDN servname in an auth header
- authenticated_entity = self.authenticated_entity
- if authenticated_entity is not None and isinstance(authenticated_entity, bytes):
- authenticated_entity = authenticated_entity.decode("utf-8", "replace")
+ # Convert the requester into a string that we can log
+ authenticated_entity = None
+ if isinstance(self.requester, str):
+ authenticated_entity = self.requester
+ elif isinstance(self.requester, Requester):
+ authenticated_entity = self.requester.authenticated_entity
+
+ # If this is a request where the target user doesn't match the user who
+ # authenticated (e.g. and admin is puppetting a user) then we log both.
+ if self.requester.user.to_string() != authenticated_entity:
+ authenticated_entity = "{},{}".format(
+ authenticated_entity, self.requester.user.to_string(),
+ )
+ elif self.requester is not None:
+ # This shouldn't happen, but we log it so we don't lose information
+ # and can see that we're doing something wrong.
+ authenticated_entity = repr(self.requester) # type: ignore[unreachable]
# ...or could be raw utf-8 bytes in the User-Agent header.
# N.B. if you don't do this, the logger explodes cryptically
diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py
index e69de29bb2..b28b7b2ef7 100644
--- a/synapse/logging/__init__.py
+++ b/synapse/logging/__init__.py
@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# These are imported to allow for nicer logging configuration files.
+from synapse.logging._remote import RemoteHandler
+from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
+
+__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 0caf325916..fb937b3f28 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import sys
import traceback
from collections import deque
@@ -21,10 +22,11 @@ from math import floor
from typing import Callable, Optional
import attr
+from typing_extensions import Deque
from zope.interface import implementer
from twisted.application.internet import ClientService
-from twisted.internet.defer import Deferred
+from twisted.internet.defer import CancelledError, Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
@@ -32,7 +34,9 @@ from twisted.internet.endpoints import (
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
-from twisted.logger import ILogObserver, Logger, LogLevel
+from twisted.python.failure import Failure
+
+logger = logging.getLogger(__name__)
@attr.s
@@ -45,11 +49,11 @@ class LogProducer:
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
- format_event: A callable to format the log entry to a string.
+ format: A callable to format the log record to a string.
"""
transport = attr.ib(type=ITransport)
- format_event = attr.ib(type=Callable[[dict], str])
+ _format = attr.ib(type=Callable[[logging.LogRecord], str])
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)
@@ -61,16 +65,19 @@ class LogProducer:
self._buffer = deque()
def resumeProducing(self):
+ # If we're already producing, nothing to do.
self._paused = False
+ # Loop until paused.
while self._paused is False and (self._buffer and self.transport.connected):
try:
- # Request the next event and format it.
- event = self._buffer.popleft()
- msg = self.format_event(event)
+ # Request the next record and format it.
+ record = self._buffer.popleft()
+ msg = self._format(record)
# Send it as a new line over the transport.
self.transport.write(msg.encode("utf8"))
+ self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
@@ -78,76 +85,85 @@ class LogProducer:
break
-@attr.s
-@implementer(ILogObserver)
-class TCPLogObserver:
+class RemoteHandler(logging.Handler):
"""
- An IObserver that writes JSON logs to a TCP target.
+ An logging handler that writes logs to a TCP target.
Args:
- hs (HomeServer): The homeserver that is being logged for.
host: The host of the logging target.
port: The logging target's port.
- format_event: A callable to format the log entry to a string.
maximum_buffer: The maximum buffer size.
"""
- hs = attr.ib()
- host = attr.ib(type=str)
- port = attr.ib(type=int)
- format_event = attr.ib(type=Callable[[dict], str])
- maximum_buffer = attr.ib(type=int)
- _buffer = attr.ib(default=attr.Factory(deque), type=deque)
- _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
- _logger = attr.ib(default=attr.Factory(Logger))
- _producer = attr.ib(default=None, type=Optional[LogProducer])
-
- def start(self) -> None:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ maximum_buffer: int = 1000,
+ level=logging.NOTSET,
+ _reactor=None,
+ ):
+ super().__init__(level=level)
+ self.host = host
+ self.port = port
+ self.maximum_buffer = maximum_buffer
+
+ self._buffer = deque() # type: Deque[logging.LogRecord]
+ self._connection_waiter = None # type: Optional[Deferred]
+ self._producer = None # type: Optional[LogProducer]
# Connect without DNS lookups if it's a direct IP.
+ if _reactor is None:
+ from twisted.internet import reactor
+
+ _reactor = reactor
+
try:
ip = ip_address(self.host)
if isinstance(ip, IPv4Address):
- endpoint = TCP4ClientEndpoint(
- self.hs.get_reactor(), self.host, self.port
- )
+ endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
elif isinstance(ip, IPv6Address):
- endpoint = TCP6ClientEndpoint(
- self.hs.get_reactor(), self.host, self.port
- )
+ endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
else:
raise ValueError("Unknown IP address provided: %s" % (self.host,))
except ValueError:
- endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+ endpoint = HostnameEndpoint(_reactor, self.host, self.port)
factory = Factory.forProtocol(Protocol)
- self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+ self._service = ClientService(endpoint, factory, clock=_reactor)
self._service.startService()
+ self._stopping = False
self._connect()
- def stop(self):
+ def close(self):
+ self._stopping = True
self._service.stopService()
def _connect(self) -> None:
"""
Triggers an attempt to connect then write to the remote if not already writing.
"""
+ # Do not attempt to open multiple connections.
if self._connection_waiter:
return
self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
- @self._connection_waiter.addErrback
- def fail(r):
- r.printTraceback(file=sys.__stderr__)
+ def fail(failure: Failure) -> None:
+ # If the Deferred was cancelled (e.g. during shutdown) do not try to
+ # reconnect (this will cause an infinite loop of errors).
+ if failure.check(CancelledError) and self._stopping:
+ return
+
+ # For a different error, print the traceback and re-connect.
+ failure.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()
- @self._connection_waiter.addCallback
- def writer(r):
+ def writer(result: Protocol) -> None:
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
- if self._producer and r.transport is self._producer.transport:
+ if self._producer and result.transport is self._producer.transport:
self._producer.resumeProducing()
self._connection_waiter = None
return
@@ -158,29 +174,29 @@ class TCPLogObserver:
# Make a new producer and start it.
self._producer = LogProducer(
- buffer=self._buffer,
- transport=r.transport,
- format_event=self.format_event,
+ buffer=self._buffer, transport=result.transport, format=self.format,
)
- r.transport.registerProducer(self._producer, True)
+ result.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None
+ self._connection_waiter.addCallbacks(writer, fail)
+
def _handle_pressure(self) -> None:
"""
- Handle backpressure by shedding events.
+ Handle backpressure by shedding records.
The buffer will, in this order, until the buffer is below the maximum:
- - Shed DEBUG events
- - Shed INFO events
- - Shed the middle 50% of the events.
+ - Shed DEBUG records.
+ - Shed INFO records.
+ - Shed the middle 50% of the records.
"""
if len(self._buffer) <= self.maximum_buffer:
return
# Strip out DEBUGs
self._buffer = deque(
- filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer)
+ filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
)
if len(self._buffer) <= self.maximum_buffer:
@@ -188,7 +204,7 @@ class TCPLogObserver:
# Strip out INFOs
self._buffer = deque(
- filter(lambda event: event["log_level"] != LogLevel.info, self._buffer)
+ filter(lambda record: record.levelno > logging.INFO, self._buffer)
)
if len(self._buffer) <= self.maximum_buffer:
@@ -209,17 +225,17 @@ class TCPLogObserver:
self._buffer.extend(reversed(end_buffer))
- def __call__(self, event: dict) -> None:
- self._buffer.append(event)
+ def emit(self, record: logging.LogRecord) -> None:
+ self._buffer.append(record)
# Handle backpressure, if it exists.
try:
self._handle_pressure()
except Exception:
- # If handling backpressure fails,clear the buffer and log the
+ # If handling backpressure fails, clear the buffer and log the
# exception.
self._buffer.clear()
- self._logger.failure("Failed clearing backpressure")
+ logger.warning("Failed clearing backpressure")
# Try and write immediately.
self._connect()
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 0fc2ea609e..14d9c104c2 100644
--- a/synapse/logging/_structured.py
+++ b/synapse/logging/_structured.py
@@ -12,138 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
import os.path
-import sys
-import typing
-import warnings
-from typing import List
+from typing import Any, Dict, Generator, Optional, Tuple
-import attr
-from constantly import NamedConstant, Names, ValueConstant, Values
-from zope.interface import implementer
-
-from twisted.logger import (
- FileLogObserver,
- FilteringLogObserver,
- ILogObserver,
- LogBeginner,
- Logger,
- LogLevel,
- LogLevelFilterPredicate,
- LogPublisher,
- eventAsText,
- jsonFileLogObserver,
-)
+from constantly import NamedConstant, Names
from synapse.config._base import ConfigError
-from synapse.logging._terse_json import (
- TerseJSONToConsoleLogObserver,
- TerseJSONToTCPLogObserver,
-)
-from synapse.logging.context import current_context
-
-
-def stdlib_log_level_to_twisted(level: str) -> LogLevel:
- """
- Convert a stdlib log level to Twisted's log level.
- """
- lvl = level.lower().replace("warning", "warn")
- return LogLevel.levelWithName(lvl)
-
-
-@attr.s
-@implementer(ILogObserver)
-class LogContextObserver:
- """
- An ILogObserver which adds Synapse-specific log context information.
-
- Attributes:
- observer (ILogObserver): The target parent observer.
- """
-
- observer = attr.ib()
-
- def __call__(self, event: dict) -> None:
- """
- Consume a log event and emit it to the parent observer after filtering
- and adding log context information.
-
- Args:
- event (dict)
- """
- # Filter out some useless events that Twisted outputs
- if "log_text" in event:
- if event["log_text"].startswith("DNSDatagramProtocol starting on "):
- return
-
- if event["log_text"].startswith("(UDP Port "):
- return
-
- if event["log_text"].startswith("Timing out client") or event[
- "log_format"
- ].startswith("Timing out client"):
- return
-
- context = current_context()
-
- # Copy the context information to the log event.
- context.copy_to_twisted_log_entry(event)
-
- self.observer(event)
-
-
-class PythonStdlibToTwistedLogger(logging.Handler):
- """
- Transform a Python stdlib log message into a Twisted one.
- """
-
- def __init__(self, observer, *args, **kwargs):
- """
- Args:
- observer (ILogObserver): A Twisted logging observer.
- *args, **kwargs: Args/kwargs to be passed to logging.Handler.
- """
- self.observer = observer
- super().__init__(*args, **kwargs)
-
- def emit(self, record: logging.LogRecord) -> None:
- """
- Emit a record to Twisted's observer.
-
- Args:
- record (logging.LogRecord)
- """
-
- self.observer(
- {
- "log_time": record.created,
- "log_text": record.getMessage(),
- "log_format": "{log_text}",
- "log_namespace": record.name,
- "log_level": stdlib_log_level_to_twisted(record.levelname),
- }
- )
-
-
-def SynapseFileLogObserver(outFile: typing.IO[str]) -> FileLogObserver:
- """
- A log observer that formats events like the traditional log formatter and
- sends them to `outFile`.
-
- Args:
- outFile (file object): The file object to write to.
- """
-
- def formatEvent(_event: dict) -> str:
- event = dict(_event)
- event["log_level"] = event["log_level"].name.upper()
- event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
- event.get("log_format", "{log_text}") or "{log_text}"
- )
- return eventAsText(event, includeSystem=False) + "\n"
-
- return FileLogObserver(outFile, formatEvent)
class DrainType(Names):
@@ -155,30 +29,12 @@ class DrainType(Names):
NETWORK_JSON_TERSE = NamedConstant()
-class OutputPipeType(Values):
- stdout = ValueConstant(sys.__stdout__)
- stderr = ValueConstant(sys.__stderr__)
-
-
-@attr.s
-class DrainConfiguration:
- name = attr.ib()
- type = attr.ib()
- location = attr.ib()
- options = attr.ib(default=None)
-
-
-@attr.s
-class NetworkJSONTerseOptions:
- maximum_buffer = attr.ib(type=int)
-
-
-DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
+DEFAULT_LOGGERS = {"synapse": {"level": "info"}}
def parse_drain_configs(
drains: dict,
-) -> typing.Generator[DrainConfiguration, None, None]:
+) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
"""
Parse the drain configurations.
@@ -186,11 +42,12 @@ def parse_drain_configs(
drains (dict): A list of drain configurations.
Yields:
- DrainConfiguration instances.
+ dict instances representing a logging handler.
Raises:
ConfigError: If any of the drain configuration items are invalid.
"""
+
for name, config in drains.items():
if "type" not in config:
raise ConfigError("Logging drains require a 'type' key.")
@@ -202,6 +59,18 @@ def parse_drain_configs(
"%s is not a known logging drain type." % (config["type"],)
)
+ # Either use the default formatter or the tersejson one.
+ if logging_type in (DrainType.CONSOLE_JSON, DrainType.FILE_JSON,):
+ formatter = "json" # type: Optional[str]
+ elif logging_type in (
+ DrainType.CONSOLE_JSON_TERSE,
+ DrainType.NETWORK_JSON_TERSE,
+ ):
+ formatter = "tersejson"
+ else:
+ # A formatter of None implies using the default formatter.
+ formatter = None
+
if logging_type in [
DrainType.CONSOLE,
DrainType.CONSOLE_JSON,
@@ -217,9 +86,11 @@ def parse_drain_configs(
% (logging_type,)
)
- pipe = OutputPipeType.lookupByName(location).value
-
- yield DrainConfiguration(name=name, type=logging_type, location=pipe)
+ yield name, {
+ "class": "logging.StreamHandler",
+ "formatter": formatter,
+ "stream": "ext://sys." + location,
+ }
elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
if "location" not in config:
@@ -233,18 +104,25 @@ def parse_drain_configs(
"File paths need to be absolute, '%s' is a relative path"
% (location,)
)
- yield DrainConfiguration(name=name, type=logging_type, location=location)
+
+ yield name, {
+ "class": "logging.FileHandler",
+ "formatter": formatter,
+ "filename": location,
+ }
elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
host = config.get("host")
port = config.get("port")
maximum_buffer = config.get("maximum_buffer", 1000)
- yield DrainConfiguration(
- name=name,
- type=logging_type,
- location=(host, port),
- options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
- )
+
+ yield name, {
+ "class": "synapse.logging.RemoteHandler",
+ "formatter": formatter,
+ "host": host,
+ "port": port,
+ "maximum_buffer": maximum_buffer,
+ }
else:
raise ConfigError(
@@ -253,126 +131,29 @@ def parse_drain_configs(
)
-class StoppableLogPublisher(LogPublisher):
+def setup_structured_logging(log_config: dict,) -> dict:
"""
- A log publisher that can tell its observers to shut down any external
- communications.
- """
-
- def stop(self):
- for obs in self._observers:
- if hasattr(obs, "stop"):
- obs.stop()
-
-
-def setup_structured_logging(
- hs,
- config,
- log_config: dict,
- logBeginner: LogBeginner,
- redirect_stdlib_logging: bool = True,
-) -> LogPublisher:
- """
- Set up Twisted's structured logging system.
-
- Args:
- hs: The homeserver to use.
- config (HomeserverConfig): The configuration of the Synapse homeserver.
- log_config (dict): The log configuration to use.
+ Convert a legacy structured logging configuration (from Synapse < v1.23.0)
+ to one compatible with the new standard library handlers.
"""
- if config.no_redirect_stdio:
- raise ConfigError(
- "no_redirect_stdio cannot be defined using structured logging."
- )
-
- logger = Logger()
-
if "drains" not in log_config:
raise ConfigError("The logging configuration requires a list of drains.")
- observers = [] # type: List[ILogObserver]
-
- for observer in parse_drain_configs(log_config["drains"]):
- # Pipe drains
- if observer.type == DrainType.CONSOLE:
- logger.debug(
- "Starting up the {name} console logger drain", name=observer.name
- )
- observers.append(SynapseFileLogObserver(observer.location))
- elif observer.type == DrainType.CONSOLE_JSON:
- logger.debug(
- "Starting up the {name} JSON console logger drain", name=observer.name
- )
- observers.append(jsonFileLogObserver(observer.location))
- elif observer.type == DrainType.CONSOLE_JSON_TERSE:
- logger.debug(
- "Starting up the {name} terse JSON console logger drain",
- name=observer.name,
- )
- observers.append(
- TerseJSONToConsoleLogObserver(observer.location, metadata={})
- )
-
- # File drains
- elif observer.type == DrainType.FILE:
- logger.debug("Starting up the {name} file logger drain", name=observer.name)
- log_file = open(observer.location, "at", buffering=1, encoding="utf8")
- observers.append(SynapseFileLogObserver(log_file))
- elif observer.type == DrainType.FILE_JSON:
- logger.debug(
- "Starting up the {name} JSON file logger drain", name=observer.name
- )
- log_file = open(observer.location, "at", buffering=1, encoding="utf8")
- observers.append(jsonFileLogObserver(log_file))
-
- elif observer.type == DrainType.NETWORK_JSON_TERSE:
- metadata = {"server_name": hs.config.server_name}
- log_observer = TerseJSONToTCPLogObserver(
- hs=hs,
- host=observer.location[0],
- port=observer.location[1],
- metadata=metadata,
- maximum_buffer=observer.options.maximum_buffer,
- )
- log_observer.start()
- observers.append(log_observer)
- else:
- # We should never get here, but, just in case, throw an error.
- raise ConfigError("%s drain type cannot be configured" % (observer.type,))
-
- publisher = StoppableLogPublisher(*observers)
- log_filter = LogLevelFilterPredicate()
-
- for namespace, namespace_config in log_config.get(
- "loggers", DEFAULT_LOGGERS
- ).items():
- # Set the log level for twisted.logger.Logger namespaces
- log_filter.setLogLevelForNamespace(
- namespace,
- stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
- )
-
- # Also set the log levels for the stdlib logger namespaces, to prevent
- # them getting to PythonStdlibToTwistedLogger and having to be formatted
- if "level" in namespace_config:
- logging.getLogger(namespace).setLevel(namespace_config.get("level"))
-
- f = FilteringLogObserver(publisher, [log_filter])
- lco = LogContextObserver(f)
-
- if redirect_stdlib_logging:
- stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
- stdliblogger = logging.getLogger()
- stdliblogger.addHandler(stuff_into_twisted)
-
- # Always redirect standard I/O, otherwise other logging outputs might miss
- # it.
- logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
+ new_config = {
+ "version": 1,
+ "formatters": {
+ "json": {"class": "synapse.logging.JsonFormatter"},
+ "tersejson": {"class": "synapse.logging.TerseJsonFormatter"},
+ },
+ "handlers": {},
+ "loggers": log_config.get("loggers", DEFAULT_LOGGERS),
+ "root": {"handlers": []},
+ }
- return publisher
+ for handler_name, handler in parse_drain_configs(log_config["drains"]):
+ new_config["handlers"][handler_name] = handler
+ # Add each handler to the root logger.
+ new_config["root"]["handlers"].append(handler_name)
-def reload_structured_logging(*args, log_config=None) -> None:
- warnings.warn(
- "Currently the structured logging system can not be reloaded, doing nothing"
- )
+ return new_config
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 9b46956ca9..2fbf5549a1 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -16,141 +16,65 @@
"""
Log formatters that output terse JSON.
"""
-
import json
-from typing import IO
-
-from twisted.logger import FileLogObserver
-
-from synapse.logging._remote import TCPLogObserver
+import logging
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
-
-def flatten_event(event: dict, metadata: dict, include_time: bool = False):
- """
- Flatten a Twisted logging event to an dictionary capable of being sent
- as a log event to a logging aggregation system.
-
- The format is vastly simplified and is not designed to be a "human readable
- string" in the sense that traditional logs are. Instead, the structure is
- optimised for searchability and filtering, with human-understandable log
- keys.
-
- Args:
- event (dict): The Twisted logging event we are flattening.
- metadata (dict): Additional data to include with each log message. This
- can be information like the server name. Since the target log
- consumer does not know who we are other than by host IP, this
- allows us to forward through static information.
- include_time (bool): Should we include the `time` key? If False, the
- event time is stripped from the event.
- """
- new_event = {}
-
- # If it's a failure, make the new event's log_failure be the traceback text.
- if "log_failure" in event:
- new_event["log_failure"] = event["log_failure"].getTraceback()
-
- # If it's a warning, copy over a string representation of the warning.
- if "warning" in event:
- new_event["warning"] = str(event["warning"])
-
- # Stdlib logging events have "log_text" as their human-readable portion,
- # Twisted ones have "log_format". For now, include the log_format, so that
- # context only given in the log format (e.g. what is being logged) is
- # available.
- if "log_text" in event:
- new_event["log"] = event["log_text"]
- else:
- new_event["log"] = event["log_format"]
-
- # We want to include the timestamp when forwarding over the network, but
- # exclude it when we are writing to stdout. This is because the log ingester
- # (e.g. logstash, fluentd) can add its own timestamp.
- if include_time:
- new_event["time"] = round(event["log_time"], 2)
-
- # Convert the log level to a textual representation.
- new_event["level"] = event["log_level"].name.upper()
-
- # Ignore these keys, and do not transfer them over to the new log object.
- # They are either useless (isError), transferred manually above (log_time,
- # log_level, etc), or contain Python objects which are not useful for output
- # (log_logger, log_source).
- keys_to_delete = [
- "isError",
- "log_failure",
- "log_format",
- "log_level",
- "log_logger",
- "log_source",
- "log_system",
- "log_time",
- "log_text",
- "observer",
- "warning",
- ]
-
- # If it's from the Twisted legacy logger (twisted.python.log), it adds some
- # more keys we want to purge.
- if event.get("log_namespace") == "log_legacy":
- keys_to_delete.extend(["message", "system", "time"])
-
- # Rather than modify the dictionary in place, construct a new one with only
- # the content we want. The original event should be considered 'frozen'.
- for key in event.keys():
-
- if key in keys_to_delete:
- continue
-
- if isinstance(event[key], (str, int, bool, float)) or event[key] is None:
- # If it's a plain type, include it as is.
- new_event[key] = event[key]
- else:
- # If it's not one of those basic types, write out a string
- # representation. This should probably be a warning in development,
- # so that we are sure we are only outputting useful data.
- new_event[key] = str(event[key])
-
- # Add the metadata information to the event (e.g. the server_name).
- new_event.update(metadata)
-
- return new_event
-
-
-def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogObserver:
- """
- A log observer that formats events to a flattened JSON representation.
-
- Args:
- outFile: The file object to write to.
- metadata: Metadata to be added to each log object.
- """
-
- def formatEvent(_event: dict) -> str:
- flattened = flatten_event(_event, metadata)
- return _encoder.encode(flattened) + "\n"
-
- return FileLogObserver(outFile, formatEvent)
-
-
-def TerseJSONToTCPLogObserver(
- hs, host: str, port: int, metadata: dict, maximum_buffer: int
-) -> FileLogObserver:
- """
- A log observer that formats events to a flattened JSON representation.
-
- Args:
- hs (HomeServer): The homeserver that is being logged for.
- host: The host of the logging target.
- port: The logging target's port.
- metadata: Metadata to be added to each log object.
- maximum_buffer: The maximum buffer size.
- """
-
- def formatEvent(_event: dict) -> str:
- flattened = flatten_event(_event, metadata, include_time=True)
- return _encoder.encode(flattened) + "\n"
-
- return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer)
+# The properties of a standard LogRecord.
+_LOG_RECORD_ATTRIBUTES = {
+ "args",
+ "asctime",
+ "created",
+ "exc_info",
+ # exc_text isn't a public attribute, but is used to cache the result of formatException.
+ "exc_text",
+ "filename",
+ "funcName",
+ "levelname",
+ "levelno",
+ "lineno",
+ "message",
+ "module",
+ "msecs",
+ "msg",
+ "name",
+ "pathname",
+ "process",
+ "processName",
+ "relativeCreated",
+ "stack_info",
+ "thread",
+ "threadName",
+}
+
+
+class JsonFormatter(logging.Formatter):
+ def format(self, record: logging.LogRecord) -> str:
+ event = {
+ "log": record.getMessage(),
+ "namespace": record.name,
+ "level": record.levelname,
+ }
+
+ return self._format(record, event)
+
+ def _format(self, record: logging.LogRecord, event: dict) -> str:
+ # Add any extra attributes to the event.
+ for key, value in record.__dict__.items():
+ if key not in _LOG_RECORD_ATTRIBUTES:
+ event[key] = value
+
+ return _encoder.encode(event)
+
+
+class TerseJsonFormatter(JsonFormatter):
+ def format(self, record: logging.LogRecord) -> str:
+ event = {
+ "log": record.getMessage(),
+ "namespace": record.name,
+ "level": record.levelname,
+ "time": round(record.created, 2),
+ }
+
+ return self._format(record, event)
diff --git a/synapse/logging/filter.py b/synapse/logging/filter.py
new file mode 100644
index 0000000000..1baf8dd679
--- /dev/null
+++ b/synapse/logging/filter.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from typing_extensions import Literal
+
+
+class MetadataFilter(logging.Filter):
+ """Logging filter that adds constant values to each record.
+
+ Args:
+ metadata: Key-value pairs to add to each record.
+ """
+
+ def __init__(self, metadata: dict):
+ self._metadata = metadata
+
+ def filter(self, record: logging.LogRecord) -> Literal[True]:
+ for key, value in self._metadata.items():
+ setattr(record, key, value)
+ return True
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e7cc74a5d2..f0c37eaf5e 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -77,8 +77,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
requester = Requester.deserialize(self.store, content["requester"])
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
logger.info("remote_join: %s into room: %s", user_id, room_id)
@@ -142,8 +141,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
requester = Requester.deserialize(self.store, content["requester"])
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
# hopefully we're now on the master, so this won't recurse!
event_id, stream_id = await self.member_handler.remote_reject_invite(
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index fc129dbaa7..8fa104c8d3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -115,8 +115,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
logger.info(
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 957415e359..e0a70f3ca1 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -18,6 +18,8 @@ import logging
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+import attr
+
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -38,6 +40,35 @@ THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
logger = logging.getLogger(__name__)
+@attr.s(frozen=True, slots=True)
+class TokenLookupResult:
+ """Result of looking up an access token.
+
+ Attributes:
+ user_id: The user that this token authenticates as
+ is_guest
+ shadow_banned
+ token_id: The ID of the access token looked up
+ device_id: The device associated with the token, if any.
+ valid_until_ms: The timestamp the token expires, if any.
+ token_owner: The "owner" of the token. This is either the same as the
+ user, or a server admin who is logged in as the user.
+ """
+
+ user_id = attr.ib(type=str)
+ is_guest = attr.ib(type=bool, default=False)
+ shadow_banned = attr.ib(type=bool, default=False)
+ token_id = attr.ib(type=Optional[int], default=None)
+ device_id = attr.ib(type=Optional[str], default=None)
+ valid_until_ms = attr.ib(type=Optional[int], default=None)
+ token_owner = attr.ib(type=str)
+
+ # Make the token owner default to the user ID, which is the common case.
+ @token_owner.default
+ def _default_token_owner(self):
+ return self.user_id
+
+
class RegistrationWorkerStore(CacheInvalidationWorkerStore):
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
@@ -108,15 +139,13 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
return is_trial
@cached()
- async def get_user_by_access_token(self, token: str) -> Optional[dict]:
+ async def get_user_by_access_token(self, token: str) -> Optional[TokenLookupResult]:
"""Get a user from the given access token.
Args:
token: The access token of a user.
Returns:
- None, if the token did not match, otherwise dict
- including the keys `name`, `is_guest`, `device_id`, `token_id`,
- `valid_until_ms`.
+ None, if the token did not match, otherwise a `TokenLookupResult`
"""
return await self.db_pool.runInteraction(
"get_user_by_access_token", self._query_for_auth, token
@@ -224,7 +253,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
return [UserID.from_string(row[0]) for row in rows]
res = await self.db_pool.runInteraction(
- "get_expired_users", get_expired_users_txn, self.clock.time_msec()
+ "get_expired_users", get_expired_users_txn, self._clock.time_msec()
)
return res
@@ -390,7 +419,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
return {
user_id: {
"expired": (
- expiration is not None and self.clock.time_msec() >= expiration
+ expiration is not None and self._clock.time_msec() >= expiration
),
"deactivated": deactivated == 1,
}
@@ -434,23 +463,24 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
- def _query_for_auth(self, txn, token):
+ def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]:
sql = """
- SELECT users.name,
+ SELECT users.name as user_id,
users.is_guest,
users.shadow_banned,
access_tokens.id as token_id,
access_tokens.device_id,
- access_tokens.valid_until_ms
+ access_tokens.valid_until_ms,
+ access_tokens.user_id as token_owner
FROM users
- INNER JOIN access_tokens on users.name = access_tokens.user_id
+ INNER JOIN access_tokens on users.name = COALESCE(puppets_user_id, access_tokens.user_id)
WHERE token = ?
"""
txn.execute(sql, (token,))
rows = self.db_pool.cursor_to_dict(txn)
if rows:
- return rows[0]
+ return TokenLookupResult(**rows[0])
return None
diff --git a/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql
new file mode 100644
index 0000000000..00a9431a97
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Whether the access token is an admin token for controlling another user.
+ALTER TABLE access_tokens ADD COLUMN puppets_user_id TEXT;
diff --git a/synapse/types.py b/synapse/types.py
index 6c338fe0b7..5e52c1eb12 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -29,6 +29,7 @@ from typing import (
Tuple,
Type,
TypeVar,
+ Union,
)
import attr
@@ -39,6 +40,7 @@ from unpaddedbase64 import decode_base64
from synapse.api.errors import Codes, SynapseError
if TYPE_CHECKING:
+ from synapse.appservice.api import ApplicationService
from synapse.storage.databases.main import DataStore
# define a version of typing.Collection that works on python 3.5
@@ -75,6 +77,7 @@ class Requester(
"shadow_banned",
"device_id",
"app_service",
+ "authenticated_entity",
],
)
):
@@ -105,6 +108,7 @@ class Requester(
"shadow_banned": self.shadow_banned,
"device_id": self.device_id,
"app_server_id": self.app_service.id if self.app_service else None,
+ "authenticated_entity": self.authenticated_entity,
}
@staticmethod
@@ -130,16 +134,18 @@ class Requester(
shadow_banned=input["shadow_banned"],
device_id=input["device_id"],
app_service=appservice,
+ authenticated_entity=input["authenticated_entity"],
)
def create_requester(
- user_id,
- access_token_id=None,
- is_guest=False,
- shadow_banned=False,
- device_id=None,
- app_service=None,
+ user_id: Union[str, "UserID"],
+ access_token_id: Optional[int] = None,
+ is_guest: Optional[bool] = False,
+ shadow_banned: Optional[bool] = False,
+ device_id: Optional[str] = None,
+ app_service: Optional["ApplicationService"] = None,
+ authenticated_entity: Optional[str] = None,
):
"""
Create a new ``Requester`` object
@@ -152,14 +158,27 @@ def create_requester(
shadow_banned (bool): True if the user making this request is shadow-banned.
device_id (str|None): device_id which was set at authentication time
app_service (ApplicationService|None): the AS requesting on behalf of the user
+ authenticated_entity: The entity that authenticated when making the request.
+ This is different to the user_id when an admin user or the server is
+ "puppeting" the user.
Returns:
Requester
"""
if not isinstance(user_id, UserID):
user_id = UserID.from_string(user_id)
+
+ if authenticated_entity is None:
+ authenticated_entity = user_id.to_string()
+
return Requester(
- user_id, access_token_id, is_guest, shadow_banned, device_id, app_service
+ user_id,
+ access_token_id,
+ is_guest,
+ shadow_banned,
+ device_id,
+ app_service,
+ authenticated_entity,
)
|