1 files changed, 8 insertions, 17 deletions
diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py
index f07309ef09..6b03233262 100644
--- a/synapse/storage/data_stores/main/pusher.py
+++ b/synapse/storage/data_stores/main/pusher.py
@@ -15,8 +15,7 @@
# limitations under the License.
import logging
-
-import six
+from typing import Iterable, Iterator
from canonicaljson import encode_canonical_json, json
@@ -27,21 +26,16 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
logger = logging.getLogger(__name__)
-if six.PY2:
- db_binary_type = six.moves.builtins.buffer
-else:
- db_binary_type = memoryview
-
class PusherWorkerStore(SQLBaseStore):
- def _decode_pushers_rows(self, rows):
+ def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[dict]:
+ """JSON-decode the data in the rows returned from the `pushers` table
+
+ Drops any rows whose data cannot be decoded
+ """
for r in rows:
dataJson = r["data"]
- r["data"] = None
try:
- if isinstance(dataJson, db_binary_type):
- dataJson = str(dataJson).decode("UTF8")
-
r["data"] = json.loads(dataJson)
except Exception as e:
logger.warning(
@@ -50,12 +44,9 @@ class PusherWorkerStore(SQLBaseStore):
dataJson,
e.args[0],
)
- pass
-
- if isinstance(r["pushkey"], db_binary_type):
- r["pushkey"] = str(r["pushkey"]).decode("UTF8")
+ continue
- return rows
+ yield r
@defer.inlineCallbacks
def user_has_pusher(self, user_id):
|