diff --git a/CHANGES.md b/CHANGES.md
index 914690b7bb..14a025e03e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,20 @@
+Synapse 1.14.0rc2 (2020-05-27)
+==============================
+
+Bugfixes
+--------
+
+- Fix cache config to not apply cache factor to event cache. Regression in v1.14.0rc1. ([\#7578](https://github.com/matrix-org/synapse/issues/7578))
+- Fix bug where `ReplicationStreamer` was not always started when replication was enabled. Bug introduced in v1.14.0rc1. ([\#7579](https://github.com/matrix-org/synapse/issues/7579))
+- Fix specifying individual cache factors for caches with special characters in their name. Regression in v1.14.0rc1. ([\#7580](https://github.com/matrix-org/synapse/issues/7580))
+
+
+Improved Documentation
+----------------------
+
+- Fix the OIDC `client_auth_method` value in the sample config. ([\#7581](https://github.com/matrix-org/synapse/issues/7581))
+
+
Synapse 1.14.0rc1 (2020-05-26)
==============================
diff --git a/changelog.d/7578.bugfix b/changelog.d/7578.bugfix
deleted file mode 100644
index cd29307361..0000000000
--- a/changelog.d/7578.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix cache config to not apply cache factor to event cache. Regression in v1.14.0rc1.
diff --git a/changelog.d/7579.bugfix b/changelog.d/7579.bugfix
deleted file mode 100644
index 54542b6026..0000000000
--- a/changelog.d/7579.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix bug where `ReplicationStreamer` was not always started when replication was enabled. Bug introduced in v1.14.0rc1.
diff --git a/changelog.d/7580.bugfix b/changelog.d/7580.bugfix
deleted file mode 100644
index b255dc2a12..0000000000
--- a/changelog.d/7580.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix specifying individual cache factors for caches with special characters in their name. Regression in v1.14.0rc1.
diff --git a/changelog.d/7584.misc b/changelog.d/7584.misc
new file mode 100644
index 0000000000..55d1689f77
--- /dev/null
+++ b/changelog.d/7584.misc
@@ -0,0 +1 @@
+Speed up processing of federation stream RDATA rows.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 0ec482719d..ce2c235994 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -1546,7 +1546,7 @@ oidc_config:
# auth method to use when exchanging the token.
# Valid values are "client_secret_basic" (default), "client_secret_post" and "none".
#
- #client_auth_method: "client_auth_basic"
+ #client_auth_method: "client_secret_basic"
# list of scopes to ask. This should include the "openid" scope. Defaults to ["openid"].
#
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 6327147d8e..5e957985d7 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
except ImportError:
pass
-__version__ = "1.14.0rc1"
+__version__ = "1.14.0rc2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 5afe52f8d4..f3ec2a34ec 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -863,9 +863,24 @@ class FederationSenderHandler(object):
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
- try:
- self.federation_position = token
+ self.federation_position = token
+
+ # We save and send the ACK to master asynchronously, so we don't block
+ # processing on persistence. We don't need to do this operation for
+ # every single RDATA we receive, we just need to do it periodically.
+
+ if self._fed_position_linearizer.is_queued(None):
+ # There is already a task queued up to save and send the token, so
+ # no need to queue up another task.
+ return
+
+ run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
+ async def _save_and_send_ack(self):
+ """Save the current federation position in the database and send an ACK
+ to master with where we're up to.
+ """
+ try:
# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
@@ -875,16 +890,18 @@ class FederationSenderHandler(object):
# we're not being re-entered?
with (await self._fed_position_linearizer.queue(None)):
+ # We persist and ack the same position, so we take a copy of it
+ # here as otherwise it can get modified from underneath us.
+ current_position = self.federation_position
+
await self.store.update_federation_out_pos(
- "federation", self.federation_position
+ "federation", current_position
)
# We ACK this token over replication so that the master can drop
# its in memory queues
- self._hs.get_tcp_replication().send_federation_ack(
- self.federation_position
- )
- self._last_ack = self.federation_position
+ self._hs.get_tcp_replication().send_federation_ack(current_position)
+ self._last_ack = current_position
except Exception:
logger.exception("Error updating federation stream position")
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index 5af110745e..586038078f 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -112,7 +112,7 @@ class OIDCConfig(Config):
# auth method to use when exchanging the token.
# Valid values are "client_secret_basic" (default), "client_secret_post" and "none".
#
- #client_auth_method: "client_auth_basic"
+ #client_auth_method: "client_secret_basic"
# list of scopes to ask. This should include the "openid" scope. Defaults to ["openid"].
#
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 581dffd8a0..f7af2bca7f 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -225,6 +225,18 @@ class Linearizer(object):
{}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
+ def is_queued(self, key) -> bool:
+ """Checks whether there is a process queued up waiting
+ """
+ entry = self.key_to_defer.get(key)
+ if not entry:
+ # No entry so nothing is waiting.
+ return False
+
+ # There are waiting deferreds only in the OrderedDict of deferreds is
+ # non-empty.
+ return bool(entry[1])
+
def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py
index 852ef23185..ca3858b184 100644
--- a/tests/util/test_linearizer.py
+++ b/tests/util/test_linearizer.py
@@ -45,6 +45,38 @@ class LinearizerTestCase(unittest.TestCase):
with (yield d2):
pass
+ @defer.inlineCallbacks
+ def test_linearizer_is_queued(self):
+ linearizer = Linearizer()
+
+ key = object()
+
+ d1 = linearizer.queue(key)
+ cm1 = yield d1
+
+ # Since d1 gets called immediately, "is_queued" should return false.
+ self.assertFalse(linearizer.is_queued(key))
+
+ d2 = linearizer.queue(key)
+ self.assertFalse(d2.called)
+
+ # Now d2 is queued up behind successful completion of cm1
+ self.assertTrue(linearizer.is_queued(key))
+
+ with cm1:
+ self.assertFalse(d2.called)
+
+ # cm1 still not done, so d2 still queued.
+ self.assertTrue(linearizer.is_queued(key))
+
+ # And now d2 is called and nothing is in the queue again
+ self.assertFalse(linearizer.is_queued(key))
+
+ with (yield d2):
+ self.assertFalse(linearizer.is_queued(key))
+
+ self.assertFalse(linearizer.is_queued(key))
+
def test_lots_of_queued_things(self):
# we have one slow thing, and lots of fast things queued up behind it.
# it should *not* explode the stack.
|