diff --git a/synapse/__init__.py b/synapse/__init__.py
index d9843a1708..445e8a5cad 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
-__version__ = "1.35.0"
+__version__ = "1.35.1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 49ed7cabcc..f3f97db2fa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -3056,8 +3056,9 @@ class FederationHandler(BaseHandler):
"""
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
- # Limit the number of events sent over federation.
- for batch in batch_iter(event_and_contexts, 1000):
+ # Limit the number of events sent over replication. We choose 200
+ # here as that is what we default to in `max_request_body_size(..)`
+ for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
instance_name=instance,
store=self.store,
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index f64845b80c..26c8ffe780 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -271,6 +271,12 @@ class SynapseTags:
# HTTP request tag (used to distinguish full vs incremental syncs, etc)
REQUEST_TAG = "request_tag"
+ # Text description of a database transaction
+ DB_TXN_DESC = "db.txn_desc"
+
+ # Uniqueish ID of a database transaction
+ DB_TXN_ID = "db.txn_id"
+
# Block everything by default
# A regex which matches the server_names to expose traces for.
@@ -356,10 +362,13 @@ def init_tracer(hs: "HomeServer"):
set_homeserver_whitelist(hs.config.opentracer_whitelist)
+ from jaeger_client.metrics.prometheus import PrometheusMetricsFactory
+
config = JaegerConfig(
config=hs.config.jaeger_config,
service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
scope_manager=LogContextScopeManager(hs.config),
+ metrics_factory=PrometheusMetricsFactory(),
)
# If we have the rust jaeger reporter available let's use that.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 24b4e6649f..3c3cc47631 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -485,21 +485,21 @@ class Notifier:
end_time = self.clock.time_msec() + timeout
while not result:
- try:
- now = self.clock.time_msec()
- if end_time <= now:
- break
-
- # Now we wait for the _NotifierUserStream to be told there
- # is a new token.
- listener = user_stream.new_listener(prev_token)
- listener.deferred = timeout_deferred(
- listener.deferred,
- (end_time - now) / 1000.0,
- self.hs.get_reactor(),
- )
+ with start_active_span("wait_for_events"):
+ try:
+ now = self.clock.time_msec()
+ if end_time <= now:
+ break
+
+ # Now we wait for the _NotifierUserStream to be told there
+ # is a new token.
+ listener = user_stream.new_listener(prev_token)
+ listener.deferred = timeout_deferred(
+ listener.deferred,
+ (end_time - now) / 1000.0,
+ self.hs.get_reactor(),
+ )
- with start_active_span("wait_for_events.deferred"):
log_kv(
{
"wait_for_events": "sleep",
@@ -517,27 +517,27 @@ class Notifier:
}
)
- current_token = user_stream.current_token
+ current_token = user_stream.current_token
- result = await callback(prev_token, current_token)
- log_kv(
- {
- "wait_for_events": "result",
- "result": bool(result),
- }
- )
- if result:
+ result = await callback(prev_token, current_token)
+ log_kv(
+ {
+ "wait_for_events": "result",
+ "result": bool(result),
+ }
+ )
+ if result:
+ break
+
+ # Update the prev_token to the current_token since nothing
+ # has happened between the old prev_token and the current_token
+ prev_token = current_token
+ except defer.TimeoutError:
+ log_kv({"wait_for_events": "timeout"})
+ break
+ except defer.CancelledError:
+ log_kv({"wait_for_events": "cancelled"})
break
-
- # Update the prev_token to the current_token since nothing
- # has happened between the old prev_token and the current_token
- prev_token = current_token
- except defer.TimeoutError:
- log_kv({"wait_for_events": "timeout"})
- break
- except defer.CancelledError:
- log_kv({"wait_for_events": "cancelled"})
- break
if result is None:
# This happened if there was no timeout or if the timeout had
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
index 2c71af4279..b68db2c57c 100644
--- a/synapse/rest/admin/media.py
+++ b/synapse/rest/admin/media.py
@@ -120,6 +120,35 @@ class QuarantineMediaByID(RestServlet):
return 200, {}
+class UnquarantineMediaByID(RestServlet):
+ """Quarantines local or remote media by a given ID so that no one can download
+ it via this server.
+ """
+
+ PATTERNS = admin_patterns(
+ "/media/unquarantine/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)"
+ )
+
+ def __init__(self, hs: "HomeServer"):
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+
+ async def on_POST(
+ self, request: SynapseRequest, server_name: str, media_id: str
+ ) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request)
+ await assert_user_is_admin(self.auth, requester.user)
+
+ logging.info(
+ "Remove from quarantine local media by ID: %s/%s", server_name, media_id
+ )
+
+ # Remove from quarantine this media id
+ await self.store.quarantine_media_by_id(server_name, media_id, None)
+
+ return 200, {}
+
+
class ProtectMediaByID(RestServlet):
"""Protect local media from being quarantined."""
@@ -290,6 +319,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server):
PurgeMediaCacheRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
QuarantineMediaByID(hs).register(http_server)
+ UnquarantineMediaByID(hs).register(http_server)
QuarantineMediaByUser(hs).register(http_server)
ProtectMediaByID(hs).register(http_server)
UnprotectMediaByID(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 70286b0ff7..5a9c27f75f 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -910,7 +910,7 @@ class RoomAliasListServlet(RestServlet):
r"^/_matrix/client/unstable/org\.matrix\.msc2432"
r"/rooms/(?P<room_id>[^/]*)/aliases"
),
- ]
+ ] + list(client_patterns("/rooms/(?P<room_id>[^/]*)/aliases$", unstable=False))
def __init__(self, hs: "HomeServer"):
super().__init__()
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index a761ad603b..974703d13a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -40,6 +40,7 @@ from twisted.enterprise import adbapi
from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
+from synapse.logging import opentracing
from synapse.logging.context import (
LoggingContext,
current_context,
@@ -313,7 +314,14 @@ class LoggingTransaction:
start = time.time()
try:
- return func(sql, *args)
+ with opentracing.start_active_span(
+ "db.query",
+ tags={
+ opentracing.tags.DATABASE_TYPE: "sql",
+ opentracing.tags.DATABASE_STATEMENT: sql,
+ },
+ ):
+ return func(sql, *args)
except Exception as e:
sql_logger.debug("[SQL FAIL] {%s} %s", self.name, e)
raise
@@ -525,9 +533,16 @@ class DatabasePool:
exception_callbacks=exception_callbacks,
)
try:
- r = func(cursor, *args, **kwargs)
- conn.commit()
- return r
+ with opentracing.start_active_span(
+ "db.txn",
+ tags={
+ opentracing.SynapseTags.DB_TXN_DESC: desc,
+ opentracing.SynapseTags.DB_TXN_ID: name,
+ },
+ ):
+ r = func(cursor, *args, **kwargs)
+ conn.commit()
+ return r
except self.engine.module.OperationalError as e:
# This can happen if the database disappears mid
# transaction.
@@ -653,16 +668,17 @@ class DatabasePool:
logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
- result = await self.runWithConnection(
- self.new_transaction,
- desc,
- after_callbacks,
- exception_callbacks,
- func,
- *args,
- db_autocommit=db_autocommit,
- **kwargs,
- )
+ with opentracing.start_active_span(f"db.{desc}"):
+ result = await self.runWithConnection(
+ self.new_transaction,
+ desc,
+ after_callbacks,
+ exception_callbacks,
+ func,
+ *args,
+ db_autocommit=db_autocommit,
+ **kwargs,
+ )
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
@@ -718,25 +734,29 @@ class DatabasePool:
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
- sched_duration_sec = monotonic_time() - start_time
- sql_scheduling_timer.observe(sched_duration_sec)
- context.add_database_scheduled(sched_duration_sec)
-
- if self.engine.is_connection_closed(conn):
- logger.debug("Reconnecting closed database connection")
- conn.reconnect()
-
- try:
- if db_autocommit:
- self.engine.attempt_to_set_autocommit(conn, True)
-
- db_conn = LoggingDatabaseConnection(
- conn, self.engine, "runWithConnection"
- )
- return func(db_conn, *args, **kwargs)
- finally:
- if db_autocommit:
- self.engine.attempt_to_set_autocommit(conn, False)
+ with opentracing.start_active_span(
+ operation_name="db.connection",
+ ):
+ sched_duration_sec = monotonic_time() - start_time
+ sql_scheduling_timer.observe(sched_duration_sec)
+ context.add_database_scheduled(sched_duration_sec)
+
+ if self.engine.is_connection_closed(conn):
+ logger.debug("Reconnecting closed database connection")
+ conn.reconnect()
+ opentracing.log_kv({"message": "reconnected"})
+
+ try:
+ if db_autocommit:
+ self.engine.attempt_to_set_autocommit(conn, True)
+
+ db_conn = LoggingDatabaseConnection(
+ conn, self.engine, "runWithConnection"
+ )
+ return func(db_conn, *args, **kwargs)
+ finally:
+ if db_autocommit:
+ self.engine.attempt_to_set_autocommit(conn, False)
return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 0cf450f81d..2a96bcd314 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -764,14 +764,15 @@ class RoomWorkerStore(SQLBaseStore):
self,
server_name: str,
media_id: str,
- quarantined_by: str,
+ quarantined_by: Optional[str],
) -> int:
- """quarantines a single local or remote media id
+ """quarantines or unquarantines a single local or remote media id
Args:
server_name: The name of the server that holds this media
media_id: The ID of the media to be quarantined
quarantined_by: The user ID that initiated the quarantine request
+ If it is `None` media will be removed from quarantine
"""
logger.info("Quarantining media: %s/%s", server_name, media_id)
is_local = server_name == self.config.server_name
@@ -838,9 +839,9 @@ class RoomWorkerStore(SQLBaseStore):
txn,
local_mxcs: List[str],
remote_mxcs: List[Tuple[str, str]],
- quarantined_by: str,
+ quarantined_by: Optional[str],
) -> int:
- """Quarantine local and remote media items
+ """Quarantine and unquarantine local and remote media items
Args:
txn (cursor)
@@ -848,18 +849,27 @@ class RoomWorkerStore(SQLBaseStore):
remote_mxcs: A list of (remote server, media id) tuples representing
remote mxc URLs
quarantined_by: The ID of the user who initiated the quarantine request
+ If it is `None` media will be removed from quarantine
Returns:
The total number of media items quarantined
"""
+
# Update all the tables to set the quarantined_by flag
- txn.executemany(
- """
+ sql = """
UPDATE local_media_repository
SET quarantined_by = ?
- WHERE media_id = ? AND safe_from_quarantine = ?
- """,
- ((quarantined_by, media_id, False) for media_id in local_mxcs),
- )
+ WHERE media_id = ?
+ """
+
+ # set quarantine
+ if quarantined_by is not None:
+ sql += "AND safe_from_quarantine = ?"
+ rows = [(quarantined_by, media_id, False) for media_id in local_mxcs]
+ # remove from quarantine
+ else:
+ rows = [(quarantined_by, media_id) for media_id in local_mxcs]
+
+ txn.executemany(sql, rows)
# Note that a rowcount of -1 can be used to indicate no rows were affected.
total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0
|