Do parallel checking and update notification with progress

This commit is contained in:
Torsten Grote 2024-10-23 16:29:36 -03:00
parent 591dfe0bd6
commit 51a6355205
No known key found for this signature in database
GPG key ID: 3E5F77D92CF891FF
4 changed files with 77 additions and 37 deletions

View file

@ -10,18 +10,22 @@ import com.stevesoltys.seedvault.backend.BackendManager
import com.stevesoltys.seedvault.crypto.Crypto import com.stevesoltys.seedvault.crypto.Crypto
import com.stevesoltys.seedvault.proto.Snapshot import com.stevesoltys.seedvault.proto.Snapshot
import com.stevesoltys.seedvault.proto.Snapshot.Blob import com.stevesoltys.seedvault.proto.Snapshot.Blob
import com.stevesoltys.seedvault.ui.notification.BackupNotificationManager
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import org.calyxos.seedvault.core.backends.AppBackupFileType import org.calyxos.seedvault.core.backends.AppBackupFileType
import org.calyxos.seedvault.core.backends.TopLevelFolder import org.calyxos.seedvault.core.backends.TopLevelFolder
import org.calyxos.seedvault.core.toHexString import org.calyxos.seedvault.core.toHexString
import org.koin.core.time.measureTimedValue
import java.security.DigestInputStream import java.security.DigestInputStream
import java.security.GeneralSecurityException import java.security.GeneralSecurityException
import java.security.MessageDigest import java.security.MessageDigest
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.min import kotlin.math.min
import kotlin.math.roundToInt import kotlin.math.roundToInt
import kotlin.math.roundToLong import kotlin.math.roundToLong
import kotlin.time.Duration.Companion.milliseconds
@WorkerThread @WorkerThread
internal class Checker( internal class Checker(
@ -29,10 +33,16 @@ internal class Checker(
private val backendManager: BackendManager, private val backendManager: BackendManager,
private val snapshotManager: SnapshotManager, private val snapshotManager: SnapshotManager,
private val loader: Loader, private val loader: Loader,
private val nm: BackupNotificationManager,
) { ) {
private val log = KotlinLogging.logger { } private val log = KotlinLogging.logger { }
private var snapshots: List<Snapshot>? = null private var snapshots: List<Snapshot>? = null
private val concurrencyLimit: Int
get() {
// TODO determine also based on backendManager
return Runtime.getRuntime().availableProcessors()
}
suspend fun getBackupSize(): Long { suspend fun getBackupSize(): Long {
// get all snapshots // get all snapshots
@ -58,32 +68,45 @@ internal class Checker(
if (snapshots == null) getBackupSize() // just get size again to be sure we get snapshots if (snapshots == null) getBackupSize() // just get size again to be sure we get snapshots
val snapshots = snapshots ?: error("Snapshots still null") val snapshots = snapshots ?: error("Snapshots still null")
val blobSample = getBlobSample(snapshots, percent)
val sampleSize = blobSample.values.sumOf { it.length.toLong() }
log.info { "Blob sample has ${blobSample.size} blobs worth $sampleSize bytes." }
val messageDigest = MessageDigest.getInstance("SHA-256") // check blobs concurrently
val (checkedSize, time) = measureTimedValue { val semaphore = Semaphore(concurrencyLimit)
checkBlobs(snapshots, percent) { chunkId, blob -> val size = AtomicLong()
val storageId = blob.id.hexFromProto() val lastNotification = AtomicLong()
log.info { "Checking blob $storageId..." } val startTime = System.currentTimeMillis()
val handle = AppBackupFileType.Blob(crypto.repoId, storageId) coroutineScope {
loader.loadFile(handle, null).close() blobSample.forEach { (chunkId, blob) ->
val readChunkId = loader.loadFile(handle, null).use { inputStream -> // launch a new co-routine for each blob to check
DigestInputStream(inputStream, messageDigest).use { digestStream -> launch {
digestStream.readAllBytes() // suspend here until we get a permit from the semaphore (there's free workers)
digestStream.messageDigest.digest().toHexString() semaphore.withPermit {
// TODO record errors
checkBlob(chunkId, blob)
}
// keep track of how much we checked and for how long
val newSize = size.addAndGet(blob.length.toLong())
val passedTime = System.currentTimeMillis() - startTime
// only log/show notification after some time has passed (throttling)
if (passedTime > lastNotification.get() + 500) {
lastNotification.set(passedTime)
val bandwidth =
(newSize / 1024 / (passedTime.toDouble() / 1000)).roundToInt()
val thousandth = ((newSize.toDouble() / sampleSize) * 1000).roundToInt()
log.debug { "$thousandth‰ - $bandwidth KB/sec - $newSize bytes" }
nm.showCheckNotification("$bandwidth KB/sec", thousandth)
} }
} }
if (readChunkId != chunkId) throw GeneralSecurityException("ChunkId doesn't match")
} }
} }
val bandwidth = (checkedSize / 1024 / (time / 1000)).roundToInt() if (sampleSize != size.get()) log.error {
log.info { "Took ${time.milliseconds} for $checkedSize bytes, $bandwidth KB/s" } "Checked ${size.get()} bytes, but expected $sampleSize"
}
} }
private suspend fun checkBlobs( private fun getBlobSample(snapshots: List<Snapshot>, percent: Int): Map<String, Blob> {
snapshots: List<Snapshot>,
percent: Int,
blobChecker: suspend (String, Blob) -> Unit,
): Long {
// split up blobs for app data and for APKs // split up blobs for app data and for APKs
val appBlobs = mutableMapOf<String, Blob>() val appBlobs = mutableMapOf<String, Blob>()
val apkBlobs = mutableMapOf<String, Blob>() val apkBlobs = mutableMapOf<String, Blob>()
@ -111,26 +134,37 @@ internal class Checker(
val appTargetSize = min((targetSize * 0.75).roundToLong(), appSize) // 75% of targetSize val appTargetSize = min((targetSize * 0.75).roundToLong(), appSize) // 75% of targetSize
log.info { "Sampling $targetSize bytes of which $appTargetSize bytes for apps." } log.info { "Sampling $targetSize bytes of which $appTargetSize bytes for apps." }
val blobSample = mutableMapOf<String, Blob>()
var currentSize = 0L var currentSize = 0L
// check apps first until we reach their target size // check apps first until we reach their target size
val appIterator = appBlobs.keys.shuffled().iterator() // random app blob iterator val appIterator = appBlobs.keys.shuffled().iterator() // random app blob iterator
while (currentSize < appTargetSize && appIterator.hasNext()) { while (currentSize < appTargetSize && appIterator.hasNext()) {
val randomChunkId = appIterator.next() val randomChunkId = appIterator.next()
val blob = appBlobs[randomChunkId] ?: error("No blob") val blob = appBlobs[randomChunkId] ?: error("No blob")
blobChecker(randomChunkId, blob) blobSample[randomChunkId] = blob
currentSize += blob.length currentSize += blob.length
// TODO do progress reporting via system notification instead
log.info { " ${((currentSize.toDouble() / targetSize) * 100).roundToInt()}%" }
} }
// now check APKs until we reach total targetSize // now check APKs until we reach total targetSize
val apkIterator = apkBlobs.keys.shuffled().iterator() // random APK blob iterator val apkIterator = apkBlobs.keys.shuffled().iterator() // random APK blob iterator
while (currentSize < targetSize && apkIterator.hasNext()) { while (currentSize < targetSize && apkIterator.hasNext()) {
val randomChunkId = apkIterator.next() val randomChunkId = apkIterator.next()
val blob = apkBlobs[randomChunkId] ?: error("No blob") val blob = apkBlobs[randomChunkId] ?: error("No blob")
blobChecker(randomChunkId, blob) blobSample[randomChunkId] = blob
currentSize += blob.length currentSize += blob.length
log.info { " ${((currentSize.toDouble() / targetSize) * 100).roundToInt()}%" }
} }
return currentSize return blobSample
}
private suspend fun checkBlob(chunkId: String, blob: Blob) {
val messageDigest = MessageDigest.getInstance("SHA-256")
val storageId = blob.id.hexFromProto()
val handle = AppBackupFileType.Blob(crypto.repoId, storageId)
val readChunkId = loader.loadFile(handle, null).use { inputStream ->
DigestInputStream(inputStream, messageDigest).use { digestStream ->
digestStream.readAllBytes()
digestStream.messageDigest.digest().toHexString()
}
}
if (readChunkId != chunkId) throw GeneralSecurityException("ChunkId doesn't match")
} }
} }

View file

@ -21,5 +21,5 @@ val repoModule = module {
} }
factory { SnapshotCreatorFactory(androidContext(), get(), get(), get()) } factory { SnapshotCreatorFactory(androidContext(), get(), get(), get()) }
factory { Pruner(get(), get(), get()) } factory { Pruner(get(), get(), get()) }
single { Checker(get(), get(), get(), get()) } single { Checker(get(), get(), get(), get(), get()) }
} }

View file

@ -332,14 +332,20 @@ internal class BackupNotificationManager(private val context: Context) {
}.build() }.build()
} }
fun getCheckNotification(): Notification { fun getCheckNotification() = Builder(context, CHANNEL_ID_CHECKING).apply {
return Builder(context, CHANNEL_ID_CHECKING).apply {
setSmallIcon(R.drawable.ic_cloud_search) setSmallIcon(R.drawable.ic_cloud_search)
setContentTitle(context.getString(R.string.notification_checking_title)) setContentTitle(context.getString(R.string.notification_checking_title))
setOngoing(true) setOngoing(true)
setShowWhen(false) setShowWhen(false)
foregroundServiceBehavior = FOREGROUND_SERVICE_IMMEDIATE foregroundServiceBehavior = FOREGROUND_SERVICE_IMMEDIATE
}.build() }
fun showCheckNotification(text: String, thousandth: Int) {
val notification = getCheckNotification()
.setContentText(text)
.setProgress(1000, thousandth, false)
.build()
nm.notify(NOTIFICATION_ID_CHECKING, notification)
} }
@SuppressLint("RestrictedApi") @SuppressLint("RestrictedApi")

View file

@ -76,7 +76,7 @@ class AppCheckerWorker(
private fun createForegroundInfo() = ForegroundInfo( private fun createForegroundInfo() = ForegroundInfo(
NOTIFICATION_ID_CHECKING, NOTIFICATION_ID_CHECKING,
nm.getCheckNotification(), nm.getCheckNotification().build(),
FOREGROUND_SERVICE_TYPE_DATA_SYNC, FOREGROUND_SERVICE_TYPE_DATA_SYNC,
) )
} }