diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 71f8d43a76..995d4764a9 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -15,8 +15,6 @@
import logging
-from six import iteritems
-
from twisted.internet import defer
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -421,7 +419,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
):
self.database_engine.lock_table(txn, "user_ips")
- for entry in iteritems(to_update):
+ for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
try:
@@ -530,7 +528,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
"user_agent": user_agent,
"last_seen": last_seen,
}
- for (access_token, ip), (user_agent, last_seen) in iteritems(results)
+ for (access_token, ip), (user_agent, last_seen) in results.items()
]
@wrap_as_background_process("prune_old_user_ips")
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index fb9f798e29..0ff0542453 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -17,8 +17,6 @@
import logging
from typing import List, Optional, Set, Tuple
-from six import iteritems
-
from canonicaljson import json
from twisted.internet import defer
@@ -208,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore):
)
# add the updated cross-signing keys to the results list
- for user_id, result in iteritems(cross_signing_keys_by_user):
+ for user_id, result in cross_signing_keys_by_user.items():
result["user_id"] = user_id
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
results.append(("org.matrix.signing_key_update", result))
@@ -269,7 +267,7 @@ class DeviceWorkerStore(SQLBaseStore):
)
results = []
- for user_id, user_devices in iteritems(devices):
+ for user_id, user_devices in devices.items():
# The prev_id for the first row is always the last row before
# `from_stream_id`
prev_id = yield self._get_last_device_update_for_remote_user(
@@ -493,7 +491,7 @@ class DeviceWorkerStore(SQLBaseStore):
if devices:
user_devices = devices[user_id]
results = []
- for device_id, device in iteritems(user_devices):
+ for device_id, device in user_devices.items():
result = {"device_id": device_id}
key_json = device.get("key_json", None)
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 20698bfd16..1a0842d4b0 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -16,8 +16,6 @@
# limitations under the License.
from typing import Dict, List
-from six import iteritems
-
from canonicaljson import encode_canonical_json, json
from twisted.enterprise.adbapi import Connection
@@ -64,9 +62,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
# Build the result structure, un-jsonify the results, and add the
# "unsigned" section
rv = {}
- for user_id, device_keys in iteritems(results):
+ for user_id, device_keys in results.items():
rv[user_id] = {}
- for device_id, device_info in iteritems(device_keys):
+ for device_id, device_info in device_keys.items():
r = db_to_json(device_info.pop("key_json"))
r["unsigned"] = {}
display_name = device_info["device_display_name"]
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 24ce8c4330..a6bb3221ff 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,10 +14,9 @@
# limitations under the License.
import itertools
import logging
+from queue import Empty, PriorityQueue
from typing import Dict, List, Optional, Set, Tuple
-from six.moves.queue import Empty, PriorityQueue
-
from twisted.internet import defer
from synapse.api.errors import StoreError
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 0321274de2..bc9f4f08ea 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -16,8 +16,6 @@
import logging
-from six import iteritems
-
from canonicaljson import json
from twisted.internet import defer
@@ -455,7 +453,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
sql,
(
_gen_entry(user_id, actions)
- for user_id, actions in iteritems(user_id_actions)
+ for user_id, actions in user_id_actions.items()
),
)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a6572571b4..cfd24d2f06 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple
from functools import wraps
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
-from six import integer_types, iteritems, text_type
-from six.moves import range
-
import attr
from canonicaljson import json
from prometheus_client import Counter
@@ -232,10 +229,10 @@ class PersistEventsStore:
event_counter.labels(event.type, origin_type, origin_entity).inc()
- for room_id, new_state in iteritems(current_state_for_room):
+ for room_id, new_state in current_state_for_room.items():
self.store.get_current_state_ids.prefill((room_id,), new_state)
- for room_id, latest_event_ids in iteritems(new_forward_extremeties):
+ for room_id, latest_event_ids in new_forward_extremeties.items():
self.store.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
)
@@ -461,7 +458,7 @@ class PersistEventsStore:
state_delta_by_room: Dict[str, DeltaState],
stream_id: int,
):
- for room_id, delta_state in iteritems(state_delta_by_room):
+ for room_id, delta_state in state_delta_by_room.items():
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
@@ -545,7 +542,7 @@ class PersistEventsStore:
""",
[
(room_id, key[0], key[1], ev_id, ev_id)
- for key, ev_id in iteritems(to_insert)
+ for key, ev_id in to_insert.items()
],
)
@@ -642,7 +639,7 @@ class PersistEventsStore:
def _update_forward_extremities_txn(
self, txn, new_forward_extremities, max_stream_order
):
- for room_id, new_extrem in iteritems(new_forward_extremities):
+ for room_id, new_extrem in new_forward_extremities.items():
self.db.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
@@ -655,7 +652,7 @@ class PersistEventsStore:
table="event_forward_extremities",
values=[
{"event_id": ev_id, "room_id": room_id}
- for room_id, new_extrem in iteritems(new_forward_extremities)
+ for room_id, new_extrem in new_forward_extremities.items()
for ev_id in new_extrem
],
)
@@ -672,7 +669,7 @@ class PersistEventsStore:
"event_id": event_id,
"stream_ordering": max_stream_order,
}
- for room_id, new_extrem in iteritems(new_forward_extremities)
+ for room_id, new_extrem in new_forward_extremities.items()
for event_id in new_extrem
],
)
@@ -727,7 +724,7 @@ class PersistEventsStore:
event.depth, depth_updates.get(event.room_id, event.depth)
)
- for room_id, depth in iteritems(depth_updates):
+ for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts):
@@ -893,8 +890,7 @@ class PersistEventsStore:
"received_ts": self._clock.time_msec(),
"sender": event.sender,
"contains_url": (
- "url" in event.content
- and isinstance(event.content["url"], text_type)
+ "url" in event.content and isinstance(event.content["url"], str)
),
}
for event, _ in events_and_contexts
@@ -1345,10 +1341,10 @@ class PersistEventsStore:
):
if (
"min_lifetime" in event.content
- and not isinstance(event.content.get("min_lifetime"), integer_types)
+ and not isinstance(event.content.get("min_lifetime"), int)
) or (
"max_lifetime" in event.content
- and not isinstance(event.content.get("max_lifetime"), integer_types)
+ and not isinstance(event.content.get("max_lifetime"), int)
):
# Ignore the event if one of the value isn't an integer.
return
@@ -1497,11 +1493,11 @@ class PersistEventsStore:
table="event_to_state_groups",
values=[
{"state_group": state_group_id, "event_id": event_id}
- for event_id, state_group_id in iteritems(state_groups)
+ for event_id, state_group_id in state_groups.items()
],
)
- for event_id, state_group_id in iteritems(state_groups):
+ for event_id, state_group_id in state_groups.items():
txn.call_after(
self.store._get_state_group_for_event.prefill,
(event_id,),
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index f54c8b1ee0..62d28f44dc 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -15,8 +15,6 @@
import logging
-from six import text_type
-
from canonicaljson import json
from twisted.internet import defer
@@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
contains_url = "url" in content
if contains_url:
- contains_url &= isinstance(content["url"], text_type)
+ contains_url &= isinstance(content["url"], str)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 213d69100a..a48c7a96ca 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore):
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)
- def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+ async def get_all_new_backfill_event_rows(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for backfill replication stream, including all new
+ backfilled events and events that have gone from being outliers to not.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
if last_id == current_id:
- return defer.succeed([])
+ return [], current_id, False
def get_all_new_backfill_event_rows(txn):
sql = (
@@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore):
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
- new_event_updates = txn.fetchall()
+ new_event_updates = [(row[0], row[1:]) for row in txn]
+ limited = False
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
+ limited = True
else:
upper_bound = current_id
@@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore):
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
- new_event_updates.extend(txn.fetchall())
+ new_event_updates.extend((row[0], row[1:]) for row in txn)
- return new_event_updates
+ if len(new_event_updates) >= limit:
+ upper_bound = new_event_updates[-1][0]
+ limited = True
- return self.db.runInteraction(
+ return new_event_updates, upper_bound, limited
+
+ return await self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 8aecd414c2..15bc13cbd0 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -81,6 +81,15 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media",
)
+ def mark_local_media_as_safe(self, media_id: str):
+ """Mark a local media as safe from quarantining."""
+ return self.db.simple_update_one(
+ table="local_media_repository",
+ keyvalues={"media_id": media_id},
+ updatevalues={"safe_from_quarantine": True},
+ desc="mark_local_media_as_safe",
+ )
+
def get_url_cache(self, url, ts):
"""Get the media_id and ts for a cached URL as of the given timestamp
Returns:
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index dab31e0c2d..7574612619 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import List, Tuple
+
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore):
)
txn.execute(sql + clause, [stream_id] + list(args))
- def get_all_presence_updates(self, last_id, current_id, limit):
+ async def get_all_presence_updates(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for presence replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
if last_id == current_id:
- return defer.succeed([])
+ return [], current_id, False
def get_all_presence_updates_txn(txn):
sql = """
@@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore):
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
- return txn.fetchall()
+ updates = [(row[0], row[1:]) for row in txn]
+
+ upper_bound = current_id
+ limited = False
+ if len(updates) >= limit:
+ upper_bound = updates[-1][0]
+ limited = True
+
+ return updates, upper_bound, limited
- return self.db.runInteraction(
+ return await self.db.runInteraction(
"get_all_presence_updates", get_all_presence_updates_txn
)
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index ef8f40959f..f6e78ca590 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -16,7 +16,7 @@
import abc
import logging
-from typing import Union
+from typing import List, Tuple, Union
from canonicaljson import json
@@ -348,23 +348,53 @@ class PushRulesWorkerStore(
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
return results
- def get_all_push_rule_updates(self, last_id, current_id, limit):
- """Get all the push rules changes that have happend on the server"""
+ async def get_all_push_rule_updates(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+ """Get updates for push_rules replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
if last_id == current_id:
- return defer.succeed([])
+ return [], current_id, False
def get_all_push_rule_updates_txn(txn):
- sql = (
- "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
- " op, priority_class, priority, conditions, actions"
- " FROM push_rules_stream"
- " WHERE ? < stream_id AND stream_id <= ?"
- " ORDER BY stream_id ASC LIMIT ?"
- )
+ sql = """
+ SELECT stream_id, user_id
+ FROM push_rules_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
txn.execute(sql, (last_id, current_id, limit))
- return txn.fetchall()
+ updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]
+
+ limited = False
+ upper_bound = current_id
+ if len(updates) == limit:
+ limited = True
+ upper_bound = updates[-1][0]
+
+ return updates, upper_bound, limited
- return self.db.runInteraction(
+ return await self.db.runInteraction(
"get_all_push_rule_updates", get_all_push_rule_updates_txn
)
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index cebdcd409f..8f5505bd67 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -16,6 +16,7 @@
import abc
import logging
+from typing import List, Tuple
from canonicaljson import json
@@ -24,6 +25,7 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore):
}
return results
- def get_all_updated_receipts(self, last_id, current_id, limit=None):
+ def get_users_sent_receipts_between(self, last_id: int, current_id: int):
+ """Get all users who sent receipts between `last_id` exclusive and
+ `current_id` inclusive.
+
+ Returns:
+ Deferred[List[str]]
+ """
+
if last_id == current_id:
return defer.succeed([])
- def get_all_updated_receipts_txn(txn):
- sql = (
- "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
- " FROM receipts_linearized"
- " WHERE ? < stream_id AND stream_id <= ?"
- " ORDER BY stream_id ASC"
- )
- args = [last_id, current_id]
- if limit is not None:
- sql += " LIMIT ?"
- args.append(limit)
- txn.execute(sql, args)
+ def _get_users_sent_receipts_between_txn(txn):
+ sql = """
+ SELECT DISTINCT user_id FROM receipts_linearized
+ WHERE ? < stream_id AND stream_id <= ?
+ """
+ txn.execute(sql, (last_id, current_id))
- return [r[0:5] + (json.loads(r[5]),) for r in txn]
+ return [r[0] for r in txn]
return self.db.runInteraction(
+ "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
+ )
+
+ async def get_all_updated_receipts(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for receipts replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_all_updated_receipts_txn(txn):
+ sql = """
+ SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+ FROM receipts_linearized
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, limit))
+
+ updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]
+
+ limited = False
+ upper_bound = current_id
+
+ if len(updates) == limit:
+ limited = True
+ upper_bound = updates[-1][0]
+
+ return updates, upper_bound, limited
+
+ return await self.db.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn
)
@@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
room_id, None, update_metrics=False
)
- # first handle the Deferred case
- if isinstance(res, defer.Deferred):
- if res.called:
- res = res.result
+ # first handle the ObservableDeferred case
+ if isinstance(res, ObservableDeferred):
+ if res.has_called():
+ res = res.get_result()
else:
res = None
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 9768981891..587d4b91c1 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -19,8 +19,6 @@ import logging
import re
from typing import Optional
-from six import iterkeys
-
from twisted.internet import defer
from twisted.internet.defer import Deferred
@@ -753,7 +751,7 @@ class RegistrationWorkerStore(SQLBaseStore):
last_send_attempt, validated_at
FROM threepid_validation_session WHERE %s
""" % (
- " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+ " AND ".join("%s = ?" % k for k in keyvalues.keys()),
)
if validated is not None:
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 46f643c6b9..13e366536a 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -626,36 +626,10 @@ class RoomWorkerStore(SQLBaseStore):
def _quarantine_media_in_room_txn(txn):
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
- total_media_quarantined = 0
-
- # Now update all the tables to set the quarantined_by flag
-
- txn.executemany(
- """
- UPDATE local_media_repository
- SET quarantined_by = ?
- WHERE media_id = ?
- """,
- ((quarantined_by, media_id) for media_id in local_mxcs),
- )
-
- txn.executemany(
- """
- UPDATE remote_media_cache
- SET quarantined_by = ?
- WHERE media_origin = ? AND media_id = ?
- """,
- (
- (quarantined_by, origin, media_id)
- for origin, media_id in remote_mxcs
- ),
+ return self._quarantine_media_txn(
+ txn, local_mxcs, remote_mxcs, quarantined_by
)
- total_media_quarantined += len(local_mxcs)
- total_media_quarantined += len(remote_mxcs)
-
- return total_media_quarantined
-
return self.db.runInteraction(
"quarantine_media_in_room", _quarantine_media_in_room_txn
)
@@ -805,17 +779,17 @@ class RoomWorkerStore(SQLBaseStore):
Returns:
The total number of media items quarantined
"""
- total_media_quarantined = 0
-
# Update all the tables to set the quarantined_by flag
txn.executemany(
"""
UPDATE local_media_repository
SET quarantined_by = ?
- WHERE media_id = ?
+ WHERE media_id = ? AND safe_from_quarantine = ?
""",
- ((quarantined_by, media_id) for media_id in local_mxcs),
+ ((quarantined_by, media_id, False) for media_id in local_mxcs),
)
+ # Note that a rowcount of -1 can be used to indicate no rows were affected.
+ total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0
txn.executemany(
"""
@@ -825,9 +799,7 @@ class RoomWorkerStore(SQLBaseStore):
""",
((quarantined_by, origin, media_id) for origin, media_id in remote_mxcs),
)
-
- total_media_quarantined += len(local_mxcs)
- total_media_quarantined += len(remote_mxcs)
+ total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 137ebac833..44bab65eac 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -17,8 +17,6 @@
import logging
from typing import Iterable, List, Set
-from six import iteritems, itervalues
-
from canonicaljson import json
from twisted.internet import defer
@@ -544,7 +542,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
users_in_room = {}
member_event_ids = [
e_id
- for key, e_id in iteritems(current_state_ids)
+ for key, e_id in current_state_ids.items()
if key[0] == EventTypes.Member
]
@@ -561,7 +559,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
users_in_room = dict(prev_res)
member_event_ids = [
e_id
- for key, e_id in iteritems(context.delta_ids)
+ for key, e_id in context.delta_ids.items()
if key[0] == EventTypes.Member
]
for etype, state_key in context.delta_ids:
@@ -1101,7 +1099,7 @@ class _JoinedHostsCache(object):
if state_entry.state_group == self.state_group:
pass
elif state_entry.prev_group == self.state_group:
- for (typ, state_key), event_id in iteritems(state_entry.delta_ids):
+ for (typ, state_key), event_id in state_entry.delta_ids.items():
if typ != EventTypes.Member:
continue
@@ -1131,7 +1129,7 @@ class _JoinedHostsCache(object):
self.state_group = state_entry.state_group
else:
self.state_group = object()
- self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
+ self._len = sum(len(v) for v in self.hosts_to_joined_users.values())
return frozenset(self.hosts_to_joined_users)
def __len__(self):
diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
index 9b95411fb6..b42c02710a 100644
--- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py
+++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
@@ -13,8 +13,6 @@
# limitations under the License.
import logging
-from six.moves import range
-
from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
new file mode 100644
index 0000000000..597f2ffd3d
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT FALSE;
diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
new file mode 100644
index 0000000000..69db89ac0e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT 0;
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 13f49d8060..a8381dc577 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -17,8 +17,6 @@ import logging
import re
from collections import namedtuple
-from six import string_types
-
from canonicaljson import json
from twisted.internet import defer
@@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# skip over it.
continue
- if not isinstance(value, string_types):
+ if not isinstance(value, str):
# If the event body, name or topic isn't a string
# then skip over it
continue
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index e89f0bffb5..379d758b5d 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -40,8 +40,6 @@ import abc
import logging
from collections import namedtuple
-from six.moves import range
-
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 4219018302..f8c776be3f 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -16,8 +16,6 @@
import logging
-from six.moves import range
-
from canonicaljson import json
from twisted.internet import defer
diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
index 1d8ee22fb1..ec2f38c373 100644
--- a/synapse/storage/data_stores/main/ui_auth.py
+++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -186,7 +186,7 @@ class UIAuthWorkerStore(SQLBaseStore):
# The clientdict gets stored as JSON.
clientdict_json = json.dumps(clientdict)
- self.db.simple_update_one(
+ await self.db.simple_update_one(
table="ui_auth_sessions",
keyvalues={"session_id": session_id},
updatevalues={"clientdict": clientdict_json},
diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py
index ff000bc9ec..be1fe97d79 100644
--- a/synapse/storage/data_stores/state/bg_updates.py
+++ b/synapse/storage/data_stores/state/bg_updates.py
@@ -15,8 +15,6 @@
import logging
-from six import iteritems
-
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
@@ -280,7 +278,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
delta_state = {
key: value
- for key, value in iteritems(curr_state)
+ for key, value in curr_state.items()
if prev_state.get(key, None) != value
}
@@ -316,7 +314,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(delta_state)
+ for key, state_id in delta_state.items()
],
)
diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py
index f3ad1e4369..5db9f20135 100644
--- a/synapse/storage/data_stores/state/store.py
+++ b/synapse/storage/data_stores/state/store.py
@@ -17,9 +17,6 @@ import logging
from collections import namedtuple
from typing import Dict, Iterable, List, Set, Tuple
-from six import iteritems
-from six.moves import range
-
from twisted.internet import defer
from synapse.api.constants import EventTypes
@@ -263,7 +260,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
# And finally update the result dict, by filtering out any extra
# stuff we pulled out of the database.
- for group, group_state_dict in iteritems(group_to_state_dict):
+ for group, group_state_dict in group_to_state_dict.items():
# We just replace any existing entries, as we will have loaded
# everything we need from the database anyway.
state[group] = state_filter.filter_state(group_state_dict)
@@ -341,11 +338,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
else:
non_member_types = non_member_filter.concrete_types()
- for group, group_state_dict in iteritems(group_to_state_dict):
+ for group, group_state_dict in group_to_state_dict.items():
state_dict_members = {}
state_dict_non_members = {}
- for k, v in iteritems(group_state_dict):
+ for k, v in group_state_dict.items():
if k[0] == EventTypes.Member:
state_dict_members[k] = v
else:
@@ -432,7 +429,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(delta_ids)
+ for key, state_id in delta_ids.items()
],
)
else:
@@ -447,7 +444,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(current_state_ids)
+ for key, state_id in current_state_ids.items()
],
)
@@ -458,7 +455,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
current_member_state_ids = {
s: ev
- for (s, ev) in iteritems(current_state_ids)
+ for (s, ev) in current_state_ids.items()
if s[0] == EventTypes.Member
}
txn.call_after(
@@ -470,7 +467,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
current_non_member_state_ids = {
s: ev
- for (s, ev) in iteritems(current_state_ids)
+ for (s, ev) in current_state_ids.items()
if s[0] != EventTypes.Member
}
txn.call_after(
@@ -555,7 +552,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(curr_state)
+ for key, state_id in curr_state.items()
],
)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b112ff3df2..3be20c866a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -16,6 +16,7 @@
# limitations under the License.
import logging
import time
+from sys import intern
from time import monotonic as monotonic_time
from typing import (
Any,
@@ -29,9 +30,6 @@ from typing import (
TypeVar,
)
-from six import iteritems, iterkeys, itervalues
-from six.moves import intern, range
-
from prometheus_client import Histogram
from twisted.enterprise import adbapi
@@ -259,7 +257,7 @@ class PerformanceCounters(object):
def interval(self, interval_duration_secs, limit=3):
counters = []
- for name, (count, cum_time) in iteritems(self.current_counters):
+ for name, (count, cum_time) in self.current_counters.items():
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append(
(
@@ -1053,7 +1051,7 @@ class Database(object):
sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table}
if keyvalues:
- sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+ sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
txn.execute(sql, list(keyvalues.values()))
else:
txn.execute(sql)
@@ -1191,7 +1189,7 @@ class Database(object):
clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
clauses = [clause]
- for key, value in iteritems(keyvalues):
+ for key, value in keyvalues.items():
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -1212,7 +1210,7 @@ class Database(object):
@staticmethod
def simple_update_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
- where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
else:
where = ""
@@ -1351,7 +1349,7 @@ class Database(object):
clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
clauses = [clause]
- for key, value in iteritems(keyvalues):
+ for key, value in keyvalues.items():
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -1388,7 +1386,7 @@ class Database(object):
txn.close()
if cache:
- min_val = min(itervalues(cache))
+ min_val = min(cache.values())
else:
min_val = max_value
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index f159400a87..ec894a91cb 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -20,9 +20,6 @@ import logging
from collections import deque, namedtuple
from typing import Iterable, List, Optional, Set, Tuple
-from six import iteritems
-from six.moves import range
-
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -218,7 +215,7 @@ class EventsPersistenceStorage(object):
partitioned.setdefault(event.room_id, []).append((event, ctx))
deferreds = []
- for room_id, evs_ctxs in iteritems(partitioned):
+ for room_id, evs_ctxs in partitioned.items():
d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
)
@@ -319,7 +316,7 @@ class EventsPersistenceStorage(object):
(event, context)
)
- for room_id, ev_ctx_rm in iteritems(events_by_room):
+ for room_id, ev_ctx_rm in events_by_room.items():
latest_event_ids = await self.main_store.get_latest_event_ids_in_room(
room_id
)
@@ -674,7 +671,7 @@ class EventsPersistenceStorage(object):
to_insert = {
key: ev_id
- for key, ev_id in iteritems(current_state)
+ for key, ev_id in current_state.items()
if ev_id != existing_state.get(key)
}
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index c522c80922..dc568476f4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,8 +16,6 @@
import logging
from typing import Iterable, List, TypeVar
-from six import iteritems, itervalues
-
import attr
from twisted.internet import defer
@@ -51,7 +49,7 @@ class StateFilter(object):
# If `include_others` is set we canonicalise the filter by removing
# wildcards from the types dictionary
if self.include_others:
- self.types = {k: v for k, v in iteritems(self.types) if v is not None}
+ self.types = {k: v for k, v in self.types.items() if v is not None}
@staticmethod
def all():
@@ -150,7 +148,7 @@ class StateFilter(object):
has_non_member_wildcard = self.include_others or any(
state_keys is None
- for t, state_keys in iteritems(self.types)
+ for t, state_keys in self.types.items()
if t != EventTypes.Member
)
@@ -199,7 +197,7 @@ class StateFilter(object):
# First we build up a lost of clauses for each type/state_key combo
clauses = []
- for etype, state_keys in iteritems(self.types):
+ for etype, state_keys in self.types.items():
if state_keys is None:
clauses.append("(type = ?)")
where_args.append(etype)
@@ -251,7 +249,7 @@ class StateFilter(object):
return dict(state_dict)
filtered_state = {}
- for k, v in iteritems(state_dict):
+ for k, v in state_dict.items():
typ, state_key = k
if typ in self.types:
state_keys = self.types[typ]
@@ -279,7 +277,7 @@ class StateFilter(object):
"""
return self.include_others or any(
- state_keys is None for state_keys in itervalues(self.types)
+ state_keys is None for state_keys in self.types.values()
)
def concrete_types(self):
@@ -292,7 +290,7 @@ class StateFilter(object):
"""
return [
(t, s)
- for t, state_keys in iteritems(self.types)
+ for t, state_keys in self.types.items()
if state_keys is not None
for s in state_keys
]
@@ -324,7 +322,7 @@ class StateFilter(object):
member_filter = StateFilter.none()
non_member_filter = StateFilter(
- types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member},
+ types={k: v for k, v in self.types.items() if k != EventTypes.Member},
include_others=self.include_others,
)
@@ -366,7 +364,7 @@ class StateGroupStorage(object):
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
- groups = set(itervalues(event_to_groups))
+ groups = set(event_to_groups.values())
group_to_state = yield self.stores.state._get_state_for_groups(groups)
return group_to_state
@@ -400,8 +398,8 @@ class StateGroupStorage(object):
state_event_map = yield self.stores.main.get_events(
[
ev_id
- for group_ids in itervalues(group_to_ids)
- for ev_id in itervalues(group_ids)
+ for group_ids in group_to_ids.values()
+ for ev_id in group_ids.values()
],
get_prev_content=False,
)
@@ -409,10 +407,10 @@ class StateGroupStorage(object):
return {
group: [
state_event_map[v]
- for v in itervalues(event_id_map)
+ for v in event_id_map.values()
if v in state_event_map
]
- for group, event_id_map in iteritems(group_to_ids)
+ for group, event_id_map in group_to_ids.items()
}
def _get_state_groups_from_groups(
@@ -444,23 +442,23 @@ class StateGroupStorage(object):
"""
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
- groups = set(itervalues(event_to_groups))
+ groups = set(event_to_groups.values())
group_to_state = yield self.stores.state._get_state_for_groups(
groups, state_filter
)
state_event_map = yield self.stores.main.get_events(
- [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
+ [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
get_prev_content=False,
)
event_to_state = {
event_id: {
k: state_event_map[v]
- for k, v in iteritems(group_to_state[group])
+ for k, v in group_to_state[group].items()
if v in state_event_map
}
- for event_id, group in iteritems(event_to_groups)
+ for event_id, group in event_to_groups.items()
}
return {event: event_to_state[event] for event in event_ids}
@@ -481,14 +479,14 @@ class StateGroupStorage(object):
"""
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
- groups = set(itervalues(event_to_groups))
+ groups = set(event_to_groups.values())
group_to_state = yield self.stores.state._get_state_for_groups(
groups, state_filter
)
event_to_state = {
event_id: group_to_state[group]
- for event_id, group in iteritems(event_to_groups)
+ for event_id, group in event_to_groups.items()
}
return {event: event_to_state[event] for event in event_ids}
|