summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-10-21 14:53:16 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-10-21 14:53:16 +0100
commite8ed9a6016d15907b2b7eb30fbf7e7831cbb1b94 (patch)
tree632f49eada8b4e8f15603fbd052ae3b1c53a2886
parentMerge commit 'f43c66d23' into anoa/dinsic_release_1_21_x (diff)
parentUpdate description of server_name config option (#8415) (diff)
downloadsynapse-e8ed9a6016d15907b2b7eb30fbf7e7831cbb1b94.tar.xz
Merge commit '8238b55e0' into anoa/dinsic_release_1_21_x
* commit '8238b55e0':
  Update description of server_name config option (#8415)
  Discard an empty upload_name before persisting an uploaded file (#7905)
  Don't table scan events on worker startup (#8419)
  Mypy fixes for `synapse.handlers.federation` (#8422)
-rw-r--r--changelog.d/7905.bugfix1
-rw-r--r--changelog.d/8415.doc1
-rw-r--r--changelog.d/8419.feature1
-rw-r--r--changelog.d/8422.misc1
-rw-r--r--docs/sample_config.yaml21
-rw-r--r--synapse/config/server.py21
-rw-r--r--synapse/federation/federation_client.py4
-rw-r--r--synapse/handlers/federation.py13
-rw-r--r--synapse/rest/media/v1/media_repository.py7
-rw-r--r--synapse/rest/media/v1/upload_resource.py4
-rw-r--r--synapse/storage/databases/state/store.py4
-rw-r--r--synapse/storage/persist_events.py2
-rw-r--r--synapse/storage/state.py6
-rw-r--r--synapse/storage/util/id_generators.py26
-rw-r--r--tests/storage/test_id_generators.py18
15 files changed, 107 insertions, 23 deletions
diff --git a/changelog.d/7905.bugfix b/changelog.d/7905.bugfix
new file mode 100644

index 0000000000..e60e624412 --- /dev/null +++ b/changelog.d/7905.bugfix
@@ -0,0 +1 @@ +Fix a longstanding bug when storing a media file with an empty `upload_name`. diff --git a/changelog.d/8415.doc b/changelog.d/8415.doc new file mode 100644
index 0000000000..28b5798533 --- /dev/null +++ b/changelog.d/8415.doc
@@ -0,0 +1 @@ +Improve description of `server_name` config option in `homserver.yaml`. \ No newline at end of file diff --git a/changelog.d/8419.feature b/changelog.d/8419.feature new file mode 100644
index 0000000000..b363e929ea --- /dev/null +++ b/changelog.d/8419.feature
@@ -0,0 +1 @@ +Add experimental support for sharding event persister. diff --git a/changelog.d/8422.misc b/changelog.d/8422.misc new file mode 100644
index 0000000000..03fba120c6 --- /dev/null +++ b/changelog.d/8422.misc
@@ -0,0 +1 @@ +Typing fixes for `synapse.handlers.federation`. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 76f588fa9f..4831484ef2 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml
@@ -33,10 +33,23 @@ ## Server ## -# The domain name of the server, with optional explicit port. -# This is used by remote servers to connect to this server, -# e.g. matrix.org, localhost:8080, etc. -# This is also the last part of your UserID. +# The public-facing domain of the server +# +# The server_name name will appear at the end of usernames and room addresses +# created on this server. For example if the server_name was example.com, +# usernames on this server would be in the format @user:example.com +# +# In most cases you should avoid using a matrix specific subdomain such as +# matrix.example.com or synapse.example.com as the server_name for the same +# reasons you wouldn't use user@email.example.com as your email address. +# See https://github.com/matrix-org/synapse/blob/master/docs/delegate.md +# for information on how to host Synapse on a subdomain while preserving +# a clean server_name. +# +# The server_name cannot be changed later so it is important to +# configure this correctly before you start Synapse. It should be all +# lowercase and may contain an explicit port. +# Examples: matrix.org, localhost:8080 # server_name: "SERVERNAME" diff --git a/synapse/config/server.py b/synapse/config/server.py
index eff2a846a4..9a5c7cb0ab 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py
@@ -647,10 +647,23 @@ class ServerConfig(Config): """\ ## Server ## - # The domain name of the server, with optional explicit port. - # This is used by remote servers to connect to this server, - # e.g. matrix.org, localhost:8080, etc. - # This is also the last part of your UserID. + # The public-facing domain of the server + # + # The server_name name will appear at the end of usernames and room addresses + # created on this server. For example if the server_name was example.com, + # usernames on this server would be in the format @user:example.com + # + # In most cases you should avoid using a matrix specific subdomain such as + # matrix.example.com or synapse.example.com as the server_name for the same + # reasons you wouldn't use user@email.example.com as your email address. + # See https://github.com/matrix-org/synapse/blob/master/docs/delegate.md + # for information on how to host Synapse on a subdomain while preserving + # a clean server_name. + # + # The server_name cannot be changed later so it is important to + # configure this correctly before you start Synapse. It should be all + # lowercase and may contain an explicit port. + # Examples: matrix.org, localhost:8080 # server_name: "%(server_name)s" diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 688d43fffb..302b2f69bc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -24,10 +24,12 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, Sequence, Tuple, TypeVar, + Union, ) from prometheus_client import Counter @@ -501,7 +503,7 @@ class FederationClient(FederationBase): user_id: str, membership: str, content: dict, - params: Dict[str, str], + params: Optional[Mapping[str, Union[str, Iterable[str]]]], ) -> Tuple[str, EventBase, RoomVersion]: """ Creates an m.room.member event, with context, without participating in the room. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 33feb1b2ba..eb91f2f762 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -155,8 +155,9 @@ class FederationHandler(BaseHandler): self._device_list_updater = hs.get_device_handler().device_list_updater self._maybe_store_room_on_invite = self.store.maybe_store_room_on_invite - # When joining a room we need to queue any events for that room up - self.room_queues = {} + # When joining a room we need to queue any events for that room up. + # For each room, a list of (pdu, origin) tuples. + self.room_queues = {} # type: Dict[str, List[Tuple[EventBase, str]]] self._room_pdu_linearizer = Linearizer("fed_room_pdu") self.third_party_event_rules = hs.get_third_party_event_rules() @@ -817,6 +818,9 @@ class FederationHandler(BaseHandler): dest, room_id, limit=limit, extremities=extremities ) + if not events: + return [] + # ideally we'd sanity check the events here for excess prev_events etc, # but it's hard to reject events at this point without completely # breaking backfill in the same way that it is currently broken by @@ -2174,10 +2178,10 @@ class FederationHandler(BaseHandler): # given state at the event. This should correctly handle cases # like bans, especially with state res v2. - state_sets = await self.state_store.get_state_groups( + state_sets_d = await self.state_store.get_state_groups( event.room_id, extrem_ids ) - state_sets = list(state_sets.values()) + state_sets = list(state_sets_d.values()) # type: List[Iterable[EventBase]] state_sets.append(state) current_states = await self.state_handler.resolve_events( room_version, state_sets, event @@ -2968,6 +2972,7 @@ class FederationHandler(BaseHandler): ) return result["max_stream_id"] else: + assert self.storage.persistence max_stream_token = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 69f353d46f..ae6822d6e7 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py
@@ -139,7 +139,7 @@ class MediaRepository: async def create_content( self, media_type: str, - upload_name: str, + upload_name: Optional[str], content: IO, content_length: int, auth_user: str, @@ -147,8 +147,8 @@ class MediaRepository: """Store uploaded content for a local user and return the mxc URL Args: - media_type: The content type of the file - upload_name: The name of the file + media_type: The content type of the file. + upload_name: The name of the file, if provided. content: A file like object that is the content to store content_length: The length of the content auth_user: The user_id of the uploader @@ -156,6 +156,7 @@ class MediaRepository: Returns: The mxc url of the stored content """ + media_id = random_string(24) file_info = FileInfo(server_name=None, file_id=media_id) diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index 3ebf7a68e6..d76f7389e1 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py
@@ -63,6 +63,10 @@ class UploadResource(DirectServeJsonResource): msg="Invalid UTF-8 filename parameter: %r" % (upload_name), code=400 ) + # If the name is falsey (e.g. an empty byte string) ensure it is None. + else: + upload_name = None + headers = request.requestHeaders if headers.hasHeader(b"Content-Type"): diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 989f0cbc9d..0e31cc811a 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -24,7 +24,7 @@ from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStor from synapse.storage.state import StateFilter from synapse.storage.types import Cursor from synapse.storage.util.sequence import build_sequence_generator -from synapse.types import StateMap +from synapse.types import MutableStateMap, StateMap from synapse.util.caches.descriptors import cached from synapse.util.caches.dictionary_cache import DictionaryCache @@ -208,7 +208,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): async def _get_state_for_groups( self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all() - ) -> Dict[int, StateMap[str]]: + ) -> Dict[int, MutableStateMap[str]]: """Gets the state at each of a list of state groups, optionally filtering by type/state_key diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 603cd7d825..ded6cf9655 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py
@@ -197,7 +197,7 @@ class EventsPersistenceStorage: async def persist_events( self, - events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: Iterable[Tuple[EventBase, EventContext]], backfilled: bool = False, ) -> RoomStreamToken: """ diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 8f68d968f0..08a69f2f96 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py
@@ -20,7 +20,7 @@ import attr from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.types import StateMap +from synapse.types import MutableStateMap, StateMap logger = logging.getLogger(__name__) @@ -349,7 +349,7 @@ class StateGroupStorage: async def get_state_groups_ids( self, _room_id: str, event_ids: Iterable[str] - ) -> Dict[int, StateMap[str]]: + ) -> Dict[int, MutableStateMap[str]]: """Get the event IDs of all the state for the state groups for the given events Args: @@ -532,7 +532,7 @@ class StateGroupStorage: def _get_state_for_groups( self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all() - ) -> Awaitable[Dict[int, StateMap[str]]]: + ) -> Awaitable[Dict[int, MutableStateMap[str]]]: """Gets the state at each of a list of state groups, optionally filtering by type/state_key diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 4fd7573e26..02fbb656e8 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py
@@ -273,6 +273,19 @@ class MultiWriterIdGenerator: # Load the current positions of all writers for the stream. if self._writers: + # We delete any stale entries in the positions table. This is + # important if we add back a writer after a long time; we want to + # consider that a "new" writer, rather than using the old stale + # entry here. + sql = """ + DELETE FROM stream_positions + WHERE + stream_name = ? + AND instance_name != ALL(?) + """ + sql = self._db.engine.convert_param_style(sql) + cur.execute(sql, (self._stream_name, self._writers)) + sql = """ SELECT instance_name, stream_id FROM stream_positions WHERE stream_name = ? @@ -453,11 +466,22 @@ class MultiWriterIdGenerator: """Returns the position of the given writer. """ + # If we don't have an entry for the given instance name, we assume it's a + # new writer. + # + # For new writers we assume their initial position to be the current + # persisted up to position. This stops Synapse from doing a full table + # scan when a new writer announces itself over replication. with self._lock: - return self._return_factor * self._current_positions.get(instance_name, 0) + return self._return_factor * self._current_positions.get( + instance_name, self._persisted_upto_position + ) def get_positions(self) -> Dict[str, int]: """Get a copy of the current positon map. + + Note that this won't necessarily include all configured writers if some + writers haven't written anything yet. """ with self._lock: diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py
index 4558bee7be..392b08832b 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py
@@ -390,17 +390,28 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): # Initial config has two writers id_gen = self._create_id_generator("first", writers=["first", "second"]) self.assertEqual(id_gen.get_persisted_upto_position(), 3) + self.assertEqual(id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(id_gen.get_current_token_for_writer("second"), 5) # New config removes one of the configs. Note that if the writer is # removed from config we assume that it has been shut down and has # finished persisting, hence why the persisted upto position is 5. id_gen_2 = self._create_id_generator("second", writers=["second"]) self.assertEqual(id_gen_2.get_persisted_upto_position(), 5) + self.assertEqual(id_gen_2.get_current_token_for_writer("second"), 5) # This config points to a single, previously unused writer. id_gen_3 = self._create_id_generator("third", writers=["third"]) self.assertEqual(id_gen_3.get_persisted_upto_position(), 5) + # For new writers we assume their initial position to be the current + # persisted up to position. This stops Synapse from doing a full table + # scan when a new writer comes along. + self.assertEqual(id_gen_3.get_current_token_for_writer("third"), 5) + + id_gen_4 = self._create_id_generator("fourth", writers=["third"]) + self.assertEqual(id_gen_4.get_current_token_for_writer("third"), 5) + # Check that we get a sane next stream ID with this new config. async def _get_next_async(): @@ -410,6 +421,13 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): self.get_success(_get_next_async()) self.assertEqual(id_gen_3.get_persisted_upto_position(), 6) + # If we add back the old "first" then we shouldn't see the persisted up + # to position revert back to 3. + id_gen_5 = self._create_id_generator("five", writers=["first", "third"]) + self.assertEqual(id_gen_5.get_persisted_upto_position(), 6) + self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 6) + self.assertEqual(id_gen_5.get_current_token_for_writer("third"), 6) + def test_sequence_consistency(self): """Test that we error out if the table and sequence diverges. """