From d689204aa6714fa12720b0ed3f5ea1c7f4baaf87 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jul 2024 11:52:14 +0100 Subject: Add docstring --- synapse/types/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 23ac1842f8..53401ade84 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1145,6 +1145,12 @@ class SlidingSyncStreamToken: This then looks something like: 5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379 + + Attributes: + stream_token: Token representing the position of all the standard + streams. + connection_token: Token used by sliding sync to track updates to any + per-connection state stored by Synapse. """ stream_token: StreamToken -- cgit 1.5.1 From 560087bf87f501145858b52e11b2dd4d6a555a53 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jul 2024 12:00:48 +0100 Subject: Remove '_token' prefix --- synapse/handlers/sliding_sync.py | 12 +++++------- synapse/types/__init__.py | 16 ++++++++-------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 3aa139ff1c..80ca3fa587 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -379,7 +379,7 @@ class SlidingSyncHandler: # this returns false, it means we timed out waiting, and we should # just return an empty response. before_wait_ts = self.clock.time_msec() - if not await self.notifier.wait_for_stream_token(from_token.stream_token): + if not await self.notifier.wait_for_stream_token(from_token.stream): logger.warning( "Timed out waiting for worker to catch up. Returning empty response" ) @@ -417,7 +417,7 @@ class SlidingSyncHandler: sync_config.user.to_string(), timeout_ms, current_sync_callback, - from_token=from_token.stream_token, + from_token=from_token.stream, ) return result @@ -459,7 +459,7 @@ class SlidingSyncHandler: await self.get_room_membership_for_user_at_to_token( user=sync_config.user, to_token=to_token, - from_token=from_token.stream_token if from_token else None, + from_token=from_token.stream if from_token else None, ) ) @@ -1414,7 +1414,7 @@ class SlidingSyncHandler: # - TODO: For an incremental sync where we haven't sent it down this # connection before to_bound = ( - from_token.stream_token.room_key + from_token.stream.room_key if from_token is not None and not room_membership_for_user_at_to_token.newly_joined else None @@ -1481,9 +1481,7 @@ class SlidingSyncHandler: instance_name=timeline_event.internal_metadata.instance_name, stream=timeline_event.internal_metadata.stream_ordering, ) - if persisted_position.persisted_after( - from_token.stream_token.room_key - ): + if persisted_position.persisted_after(from_token.stream.room_key): num_live += 1 else: # Since we're iterating over the timeline events in diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 53401ade84..a0956febc0 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1147,14 +1147,14 @@ class SlidingSyncStreamToken: 5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379 Attributes: - stream_token: Token representing the position of all the standard + stream: Token representing the position of all the standard streams. - connection_token: Token used by sliding sync to track updates to any + connection: Token used by sliding sync to track updates to any per-connection state stored by Synapse. """ - stream_token: StreamToken - connection_token: int + stream: StreamToken + connection: int @staticmethod @cancellable @@ -1166,8 +1166,8 @@ class SlidingSyncStreamToken: stream_token = await StreamToken.from_string(store, stream_token_str) return SlidingSyncStreamToken( - stream_token=stream_token, - connection_token=connection_token, + stream=stream_token, + connection=connection_token, ) except CancelledError: raise @@ -1176,8 +1176,8 @@ class SlidingSyncStreamToken: async def to_string(self, store: "DataStore") -> str: """Serializes the token to a string""" - stream_token_str = await self.stream_token.to_string(store) - return f"{self.connection_token}/{stream_token_str}" + stream_token_str = await self.stream.to_string(store) + return f"{self.connection}/{stream_token_str}" @attr.s(slots=True, frozen=True, auto_attribs=True) -- cgit 1.5.1 From 71d83477cb494db7f8384eedf127e59c8c8cd479 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 19 Jul 2024 11:02:38 +0100 Subject: Bump sentry-sdk from 2.6.0 to 2.8.0 (#17456) --- poetry.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index b3e45972f1..2bfcb59cf2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2430,13 +2430,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"] [[package]] name = "sentry-sdk" -version = "2.6.0" +version = "2.8.0" description = "Python client for Sentry (https://sentry.io)" optional = true python-versions = ">=3.6" files = [ - {file = "sentry_sdk-2.6.0-py2.py3-none-any.whl", hash = "sha256:422b91cb49378b97e7e8d0e8d5a1069df23689d45262b86f54988a7db264e874"}, - {file = "sentry_sdk-2.6.0.tar.gz", hash = "sha256:65cc07e9c6995c5e316109f138570b32da3bd7ff8d0d0ee4aaf2628c3dd8127d"}, + {file = "sentry_sdk-2.8.0-py2.py3-none-any.whl", hash = "sha256:6051562d2cfa8087bb8b4b8b79dc44690f8a054762a29c07e22588b1f619bfb5"}, + {file = "sentry_sdk-2.8.0.tar.gz", hash = "sha256:aa4314f877d9cd9add5a0c9ba18e3f27f99f7de835ce36bd150e48a41c7c646f"}, ] [package.dependencies] @@ -2466,7 +2466,7 @@ langchain = ["langchain (>=0.0.210)"] loguru = ["loguru (>=0.5)"] openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"] opentelemetry = ["opentelemetry-distro (>=0.35b0)"] -opentelemetry-experimental = ["opentelemetry-distro (>=0.40b0,<1.0)", "opentelemetry-instrumentation-aiohttp-client (>=0.40b0,<1.0)", "opentelemetry-instrumentation-django (>=0.40b0,<1.0)", "opentelemetry-instrumentation-fastapi (>=0.40b0,<1.0)", "opentelemetry-instrumentation-flask (>=0.40b0,<1.0)", "opentelemetry-instrumentation-requests (>=0.40b0,<1.0)", "opentelemetry-instrumentation-sqlite3 (>=0.40b0,<1.0)", "opentelemetry-instrumentation-urllib (>=0.40b0,<1.0)"] +opentelemetry-experimental = ["opentelemetry-instrumentation-aio-pika (==0.46b0)", "opentelemetry-instrumentation-aiohttp-client (==0.46b0)", "opentelemetry-instrumentation-aiopg (==0.46b0)", "opentelemetry-instrumentation-asgi (==0.46b0)", "opentelemetry-instrumentation-asyncio (==0.46b0)", "opentelemetry-instrumentation-asyncpg (==0.46b0)", "opentelemetry-instrumentation-aws-lambda (==0.46b0)", "opentelemetry-instrumentation-boto (==0.46b0)", "opentelemetry-instrumentation-boto3sqs (==0.46b0)", "opentelemetry-instrumentation-botocore (==0.46b0)", "opentelemetry-instrumentation-cassandra (==0.46b0)", "opentelemetry-instrumentation-celery (==0.46b0)", "opentelemetry-instrumentation-confluent-kafka (==0.46b0)", "opentelemetry-instrumentation-dbapi (==0.46b0)", "opentelemetry-instrumentation-django (==0.46b0)", "opentelemetry-instrumentation-elasticsearch (==0.46b0)", "opentelemetry-instrumentation-falcon (==0.46b0)", "opentelemetry-instrumentation-fastapi (==0.46b0)", "opentelemetry-instrumentation-flask (==0.46b0)", "opentelemetry-instrumentation-grpc (==0.46b0)", "opentelemetry-instrumentation-httpx (==0.46b0)", "opentelemetry-instrumentation-jinja2 (==0.46b0)", "opentelemetry-instrumentation-kafka-python (==0.46b0)", "opentelemetry-instrumentation-logging (==0.46b0)", "opentelemetry-instrumentation-mysql (==0.46b0)", "opentelemetry-instrumentation-mysqlclient (==0.46b0)", "opentelemetry-instrumentation-pika (==0.46b0)", "opentelemetry-instrumentation-psycopg (==0.46b0)", "opentelemetry-instrumentation-psycopg2 (==0.46b0)", "opentelemetry-instrumentation-pymemcache (==0.46b0)", "opentelemetry-instrumentation-pymongo (==0.46b0)", "opentelemetry-instrumentation-pymysql (==0.46b0)", "opentelemetry-instrumentation-pyramid (==0.46b0)", "opentelemetry-instrumentation-redis (==0.46b0)", "opentelemetry-instrumentation-remoulade (==0.46b0)", "opentelemetry-instrumentation-requests (==0.46b0)", "opentelemetry-instrumentation-sklearn (==0.46b0)", "opentelemetry-instrumentation-sqlalchemy (==0.46b0)", "opentelemetry-instrumentation-sqlite3 (==0.46b0)", "opentelemetry-instrumentation-starlette (==0.46b0)", "opentelemetry-instrumentation-system-metrics (==0.46b0)", "opentelemetry-instrumentation-threading (==0.46b0)", "opentelemetry-instrumentation-tornado (==0.46b0)", "opentelemetry-instrumentation-tortoiseorm (==0.46b0)", "opentelemetry-instrumentation-urllib (==0.46b0)", "opentelemetry-instrumentation-urllib3 (==0.46b0)", "opentelemetry-instrumentation-wsgi (==0.46b0)"] pure-eval = ["asttokens", "executing", "pure-eval"] pymongo = ["pymongo (>=3.1)"] pyspark = ["pyspark (>=2.4.4)"] @@ -2476,7 +2476,7 @@ sanic = ["sanic (>=0.8)"] sqlalchemy = ["sqlalchemy (>=1.2)"] starlette = ["starlette (>=0.19.1)"] starlite = ["starlite (>=1.48)"] -tornado = ["tornado (>=5)"] +tornado = ["tornado (>=6)"] [[package]] name = "service-identity" -- cgit 1.5.1 From 43c865f7c98a1e31d6eb8d3979d1e199fadcb950 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jul 2024 12:09:39 +0100 Subject: Generate room sync data concurrently (#17458) This is also what we do for standard `/sync`. --- changelog.d/17458.misc | 1 + synapse/handlers/sliding_sync.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 changelog.d/17458.misc diff --git a/changelog.d/17458.misc b/changelog.d/17458.misc new file mode 100644 index 0000000000..09cce15d0d --- /dev/null +++ b/changelog.d/17458.misc @@ -0,0 +1 @@ +Speed up generating sliding sync responses. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index a23a6b9dd9..423f0329d6 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -28,6 +28,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe from synapse.events import EventBase from synapse.events.utils import strip_event from synapse.handlers.relations import BundledAggregations +from synapse.logging.opentracing import start_active_span, tag_args, trace from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.stream import CurrentStateDeltaMembership from synapse.storage.roommember import MemberSummary @@ -43,6 +44,7 @@ from synapse.types import ( ) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult from synapse.types.state import StateFilter +from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -592,11 +594,14 @@ class SlidingSyncHandler: # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} - for room_id, room_sync_config in relevant_room_map.items(): + + @trace + @tag_args + async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( user=sync_config.user, room_id=room_id, - room_sync_config=room_sync_config, + room_sync_config=relevant_room_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ room_id ], @@ -606,6 +611,9 @@ class SlidingSyncHandler: rooms[room_id] = room_sync_result + with start_active_span("sliding_sync.generate_room_entries"): + await concurrently_execute(handle_room, relevant_room_map, 10) + extensions = await self.get_extensions_response( sync_config=sync_config, to_token=to_token ) -- cgit 1.5.1