diff --git a/app/src/main/java/com/stevesoltys/seedvault/repo/Checker.kt b/app/src/main/java/com/stevesoltys/seedvault/repo/Checker.kt index 4c077f3a..374d497a 100644 --- a/app/src/main/java/com/stevesoltys/seedvault/repo/Checker.kt +++ b/app/src/main/java/com/stevesoltys/seedvault/repo/Checker.kt @@ -10,18 +10,22 @@ import com.stevesoltys.seedvault.backend.BackendManager import com.stevesoltys.seedvault.crypto.Crypto import com.stevesoltys.seedvault.proto.Snapshot import com.stevesoltys.seedvault.proto.Snapshot.Blob +import com.stevesoltys.seedvault.ui.notification.BackupNotificationManager import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import org.calyxos.seedvault.core.backends.AppBackupFileType import org.calyxos.seedvault.core.backends.TopLevelFolder import org.calyxos.seedvault.core.toHexString -import org.koin.core.time.measureTimedValue import java.security.DigestInputStream import java.security.GeneralSecurityException import java.security.MessageDigest +import java.util.concurrent.atomic.AtomicLong import kotlin.math.min import kotlin.math.roundToInt import kotlin.math.roundToLong -import kotlin.time.Duration.Companion.milliseconds @WorkerThread internal class Checker( @@ -29,10 +33,16 @@ internal class Checker( private val backendManager: BackendManager, private val snapshotManager: SnapshotManager, private val loader: Loader, + private val nm: BackupNotificationManager, ) { private val log = KotlinLogging.logger { } private var snapshots: List? = null + private val concurrencyLimit: Int + get() { + // TODO determine also based on backendManager + return Runtime.getRuntime().availableProcessors() + } suspend fun getBackupSize(): Long { // get all snapshots @@ -58,32 +68,45 @@ internal class Checker( if (snapshots == null) getBackupSize() // just get size again to be sure we get snapshots val snapshots = snapshots ?: error("Snapshots still null") + val blobSample = getBlobSample(snapshots, percent) + val sampleSize = blobSample.values.sumOf { it.length.toLong() } + log.info { "Blob sample has ${blobSample.size} blobs worth $sampleSize bytes." } - val messageDigest = MessageDigest.getInstance("SHA-256") - val (checkedSize, time) = measureTimedValue { - checkBlobs(snapshots, percent) { chunkId, blob -> - val storageId = blob.id.hexFromProto() - log.info { "Checking blob $storageId..." } - val handle = AppBackupFileType.Blob(crypto.repoId, storageId) - loader.loadFile(handle, null).close() - val readChunkId = loader.loadFile(handle, null).use { inputStream -> - DigestInputStream(inputStream, messageDigest).use { digestStream -> - digestStream.readAllBytes() - digestStream.messageDigest.digest().toHexString() + // check blobs concurrently + val semaphore = Semaphore(concurrencyLimit) + val size = AtomicLong() + val lastNotification = AtomicLong() + val startTime = System.currentTimeMillis() + coroutineScope { + blobSample.forEach { (chunkId, blob) -> + // launch a new co-routine for each blob to check + launch { + // suspend here until we get a permit from the semaphore (there's free workers) + semaphore.withPermit { + // TODO record errors + checkBlob(chunkId, blob) + } + // keep track of how much we checked and for how long + val newSize = size.addAndGet(blob.length.toLong()) + val passedTime = System.currentTimeMillis() - startTime + // only log/show notification after some time has passed (throttling) + if (passedTime > lastNotification.get() + 500) { + lastNotification.set(passedTime) + val bandwidth = + (newSize / 1024 / (passedTime.toDouble() / 1000)).roundToInt() + val thousandth = ((newSize.toDouble() / sampleSize) * 1000).roundToInt() + log.debug { "$thousandth‰ - $bandwidth KB/sec - $newSize bytes" } + nm.showCheckNotification("$bandwidth KB/sec", thousandth) } } - if (readChunkId != chunkId) throw GeneralSecurityException("ChunkId doesn't match") } } - val bandwidth = (checkedSize / 1024 / (time / 1000)).roundToInt() - log.info { "Took ${time.milliseconds} for $checkedSize bytes, $bandwidth KB/s" } + if (sampleSize != size.get()) log.error { + "Checked ${size.get()} bytes, but expected $sampleSize" + } } - private suspend fun checkBlobs( - snapshots: List, - percent: Int, - blobChecker: suspend (String, Blob) -> Unit, - ): Long { + private fun getBlobSample(snapshots: List, percent: Int): Map { // split up blobs for app data and for APKs val appBlobs = mutableMapOf() val apkBlobs = mutableMapOf() @@ -111,26 +134,37 @@ internal class Checker( val appTargetSize = min((targetSize * 0.75).roundToLong(), appSize) // 75% of targetSize log.info { "Sampling $targetSize bytes of which $appTargetSize bytes for apps." } + val blobSample = mutableMapOf() var currentSize = 0L // check apps first until we reach their target size val appIterator = appBlobs.keys.shuffled().iterator() // random app blob iterator while (currentSize < appTargetSize && appIterator.hasNext()) { val randomChunkId = appIterator.next() val blob = appBlobs[randomChunkId] ?: error("No blob") - blobChecker(randomChunkId, blob) + blobSample[randomChunkId] = blob currentSize += blob.length - // TODO do progress reporting via system notification instead - log.info { " ${((currentSize.toDouble() / targetSize) * 100).roundToInt()}%" } } // now check APKs until we reach total targetSize val apkIterator = apkBlobs.keys.shuffled().iterator() // random APK blob iterator while (currentSize < targetSize && apkIterator.hasNext()) { val randomChunkId = apkIterator.next() val blob = apkBlobs[randomChunkId] ?: error("No blob") - blobChecker(randomChunkId, blob) + blobSample[randomChunkId] = blob currentSize += blob.length - log.info { " ${((currentSize.toDouble() / targetSize) * 100).roundToInt()}%" } } - return currentSize + return blobSample + } + + private suspend fun checkBlob(chunkId: String, blob: Blob) { + val messageDigest = MessageDigest.getInstance("SHA-256") + val storageId = blob.id.hexFromProto() + val handle = AppBackupFileType.Blob(crypto.repoId, storageId) + val readChunkId = loader.loadFile(handle, null).use { inputStream -> + DigestInputStream(inputStream, messageDigest).use { digestStream -> + digestStream.readAllBytes() + digestStream.messageDigest.digest().toHexString() + } + } + if (readChunkId != chunkId) throw GeneralSecurityException("ChunkId doesn't match") } } diff --git a/app/src/main/java/com/stevesoltys/seedvault/repo/RepoModule.kt b/app/src/main/java/com/stevesoltys/seedvault/repo/RepoModule.kt index e265433f..5c8ac73b 100644 --- a/app/src/main/java/com/stevesoltys/seedvault/repo/RepoModule.kt +++ b/app/src/main/java/com/stevesoltys/seedvault/repo/RepoModule.kt @@ -21,5 +21,5 @@ val repoModule = module { } factory { SnapshotCreatorFactory(androidContext(), get(), get(), get()) } factory { Pruner(get(), get(), get()) } - single { Checker(get(), get(), get(), get()) } + single { Checker(get(), get(), get(), get(), get()) } } diff --git a/app/src/main/java/com/stevesoltys/seedvault/ui/notification/BackupNotificationManager.kt b/app/src/main/java/com/stevesoltys/seedvault/ui/notification/BackupNotificationManager.kt index 3cf1230c..7234a564 100644 --- a/app/src/main/java/com/stevesoltys/seedvault/ui/notification/BackupNotificationManager.kt +++ b/app/src/main/java/com/stevesoltys/seedvault/ui/notification/BackupNotificationManager.kt @@ -332,14 +332,20 @@ internal class BackupNotificationManager(private val context: Context) { }.build() } - fun getCheckNotification(): Notification { - return Builder(context, CHANNEL_ID_CHECKING).apply { - setSmallIcon(R.drawable.ic_cloud_search) - setContentTitle(context.getString(R.string.notification_checking_title)) - setOngoing(true) - setShowWhen(false) - foregroundServiceBehavior = FOREGROUND_SERVICE_IMMEDIATE - }.build() + fun getCheckNotification() = Builder(context, CHANNEL_ID_CHECKING).apply { + setSmallIcon(R.drawable.ic_cloud_search) + setContentTitle(context.getString(R.string.notification_checking_title)) + setOngoing(true) + setShowWhen(false) + foregroundServiceBehavior = FOREGROUND_SERVICE_IMMEDIATE + } + + fun showCheckNotification(text: String, thousandth: Int) { + val notification = getCheckNotification() + .setContentText(text) + .setProgress(1000, thousandth, false) + .build() + nm.notify(NOTIFICATION_ID_CHECKING, notification) } @SuppressLint("RestrictedApi") diff --git a/app/src/main/java/com/stevesoltys/seedvault/worker/AppCheckerWorker.kt b/app/src/main/java/com/stevesoltys/seedvault/worker/AppCheckerWorker.kt index cc2e1a0b..e6fe566a 100644 --- a/app/src/main/java/com/stevesoltys/seedvault/worker/AppCheckerWorker.kt +++ b/app/src/main/java/com/stevesoltys/seedvault/worker/AppCheckerWorker.kt @@ -76,7 +76,7 @@ class AppCheckerWorker( private fun createForegroundInfo() = ForegroundInfo( NOTIFICATION_ID_CHECKING, - nm.getCheckNotification(), + nm.getCheckNotification().build(), FOREGROUND_SERVICE_TYPE_DATA_SYNC, ) }