add asynchronous function for testing purpose

sync_v2
Laurent 7 months ago
parent f32bc866f5
commit b32b0f2a74
  1. 74
      LeStorage/ApiCallCollection.swift
  2. 6
      LeStorage/StoreCenter.swift
  3. 149
      LeStorage/SyncedCollection.swift

@ -180,7 +180,7 @@ actor ApiCallCollection<T: SyncedStorable>: SomeCallCollection {
}
}
}
/// Reschedule the execution of API calls
fileprivate func _waitAndExecuteApiCalls() async {
@ -194,7 +194,37 @@ actor ApiCallCollection<T: SyncedStorable>: SomeCallCollection {
await self._wait()
await self._batchExecution()
// Logger.log("\(T.resourceName()) > EXECUTE CALLS: \(self.items.count)")
// let batches = Dictionary(grouping: self.items, by: { $0.transactionId })
//
// for batch in batches.values {
// do {
// if batch.count == 1, let apiCall = batch.first, apiCall.method == .get {
// try await self._executeGetCall(apiCall: apiCall)
// } else {
// let results = try await self._executeApiCalls(batch)
// if T.copyServerResponse {
// let instances = results.compactMap { $0.data }
// StoreCenter.main.updateLocalInstances(instances)
// }
// }
// } catch {
// Logger.error(error)
// }
// }
// Logger.log("\(T.resourceName()) > EXECUTE CALLS ENDED !")
self._isExecutingCalls = false
if self.items.isNotEmpty {
await self._waitAndExecuteApiCalls()
}
// Logger.log("\(T.resourceName()) > isRescheduling = \(self._isRescheduling)")
}
fileprivate func _batchExecution() async {
let batches = Dictionary(grouping: self.items, by: { $0.transactionId })
for batch in batches.values {
@ -212,14 +242,6 @@ actor ApiCallCollection<T: SyncedStorable>: SomeCallCollection {
Logger.error(error)
}
}
// Logger.log("\(T.resourceName()) > EXECUTE CALLS ENDED !")
self._isExecutingCalls = false
if self.items.isNotEmpty {
await self._waitAndExecuteApiCalls()
}
// Logger.log("\(T.resourceName()) > isRescheduling = \(self._isRescheduling)")
}
fileprivate func _executeGetCall(apiCall: ApiCall<T>) async throws {
@ -257,7 +279,7 @@ actor ApiCallCollection<T: SyncedStorable>: SomeCallCollection {
/// The method makes some clean up when necessary:
/// - When deleting, we delete other calls as they are unecessary
/// - When updating, we delete other PUT as we don't want them to be executed in random orders
func callForInstance(_ instance: T, method: HTTPMethod, transactionId: String? = nil) -> ApiCall<T> {
fileprivate func _prepareCall(instance: T, method: HTTPMethod, transactionId: String? = nil) {
// cleanup if necessary
switch method {
@ -272,9 +294,7 @@ actor ApiCallCollection<T: SyncedStorable>: SomeCallCollection {
}
let call: ApiCall<T> = self._createCall(method, instance: instance, transactionId: transactionId)
self._prepareCall(apiCall: call)
return call
self._addCallToWaitingList(call)
}
/// deletes an array of ApiCall by id
@ -305,7 +325,7 @@ actor ApiCallCollection<T: SyncedStorable>: SomeCallCollection {
}
/// Prepares a call for execution by updating its properties and adding it to its collection for storage
fileprivate func _prepareCall(apiCall: ApiCall<T>) {
fileprivate func _addCallToWaitingList(_ apiCall: ApiCall<T>) {
apiCall.lastAttemptDate = Date()
apiCall.attemptsCount += 1
self.addOrUpdate(apiCall)
@ -340,30 +360,34 @@ actor ApiCallCollection<T: SyncedStorable>: SomeCallCollection {
self.rescheduleImmediately()
}
}
/// Creates and execute the ApiCalls corresponding to the [batch]
func executeBatch(_ batch: OperationBatch<T>) {
self._prepareCalls(batch: batch)
self.rescheduleImmediately()
}
func singleBatchExecution(_ batch: OperationBatch<T>) async {
self._prepareCalls(batch: batch)
await self._batchExecution()
}
var apiCalls: [ApiCall<T>] = []
fileprivate func _prepareCalls(batch: OperationBatch<T>) {
let transactionId = Store.randomId()
for insert in batch.inserts {
let call = self.callForInstance(insert, method: .post, transactionId: transactionId)
apiCalls.append(call)
self._prepareCall(instance: insert, method: .post, transactionId: transactionId)
}
for update in batch.updates {
let call = self.callForInstance(update, method: .put, transactionId: transactionId)
apiCalls.append(call)
self._prepareCall(instance: update, method: .put, transactionId: transactionId)
}
for delete in batch.deletes {
let call = self.callForInstance(delete, method: .delete, transactionId: transactionId)
apiCalls.append(call)
self._prepareCall(instance: delete, method: .delete, transactionId: transactionId)
}
self.rescheduleImmediately()
}
/// Prepares and executes a GET call
fileprivate func _prepareAndSendGetCall(_ apiCall: ApiCall<T>) async throws {
self._prepareCall(apiCall: apiCall)
self._addCallToWaitingList(apiCall)
try await self._executeGetCall(apiCall: apiCall)
}

@ -283,7 +283,7 @@ public class StoreCenter {
Task {
do {
try await apiCallCollection.loadFromFile()
let count = await apiCallCollection.items.count
// let count = await apiCallCollection.items.count
// Logger.log("collection \(T.resourceName()) loaded with \(count)")
await apiCallCollection.rescheduleApiCallsIfNecessary()
} catch {
@ -428,6 +428,10 @@ public class StoreCenter {
return try await self.apiCallCollection().executeBatch(batch)
}
func singleBatchExecution<T: SyncedStorable>(_ batch: OperationBatch<T>) async throws {
return try await self.apiCallCollection().singleBatchExecution(batch)
}
/// Transmit the insertion request to the ApiCall collection
/// - Parameters:
/// - instance: an object to insert

@ -87,26 +87,78 @@ public class SyncedCollection<T : SyncedStorable>: BaseCollection<T>, SomeSynced
// MARK: - Basic operations with sync
/// Adds or update an instance and writes
/// Adds or update an instance asynchronously and waits for network operations
func addOrUpdateAsync(instance: T) async {
if let result = _addOrUpdateCore(instance: instance) {
if result.isNewItem {
await self._executeBatchOnce(OperationBatch(insert: result.item))
} else {
await self._executeBatchOnce(OperationBatch(update: result.item))
}
}
}
/// Adds or update an instance synchronously, dispatching network operations to background tasks
public override func addOrUpdate(instance: T) {
if let result = _addOrUpdateCore(instance: instance) {
if result.isNewItem {
Task { await self._sendInsertion(result.item) }
} else {
Task { await self._sendUpdate(result.item) }
}
}
}
/// Private helper function that contains the shared logic
private func _addOrUpdateCore(instance: T) -> (item: T, isNewItem: Bool)? {
instance.lastUpdate = Date()
if let index = self.items.firstIndex(where: { $0.id == instance.id }) {
if self.updateItem(instance, index: index, shouldBeSynchronized: true) {
self._sendUpdate(instance)
self.setChanged()
return (instance, false)
}
} else {
if self.addItem(instance: instance, shouldBeSynchronized: true) {
self._sendInsertion(instance)
self.setChanged()
return (instance, true)
}
}
return nil
}
/// Adds or update a sequence and writes
override public func addOrUpdate(contentOfs sequence: any Sequence<T>) {
// Logger.log("\(T.resourceName()) : \(sequence.underestimatedCount) items")
// func addOrUpdateAsync(instance: T) async {
// instance.lastUpdate = Date()
// if let index = self.items.firstIndex(where: { $0.id == instance.id }) {
// if self.updateItem(instance, index: index, shouldBeSynchronized: true) {
// await self._sendUpdate(instance)
// self.setChanged()
// }
// } else {
// if self.addItem(instance: instance, shouldBeSynchronized: true) {
// await self._sendInsertion(instance)
// self.setChanged()
// }
// }
// }
//
// /// Adds or update an instance and writes
// public override func addOrUpdate(instance: T) {
// instance.lastUpdate = Date()
// if let index = self.items.firstIndex(where: { $0.id == instance.id }) {
// if self.updateItem(instance, index: index, shouldBeSynchronized: true) {
// Task { await self._sendUpdate(instance) }
// self.setChanged()
// }
// } else {
// if self.addItem(instance: instance, shouldBeSynchronized: true) {
// Task { await self._sendInsertion(instance) }
// self.setChanged()
// }
// }
// }
fileprivate func _addOrUpdateCore(contentOfs sequence: any Sequence<T>) -> OperationBatch<T> {
defer {
self.setChanged()
}
@ -126,9 +178,19 @@ public class SyncedCollection<T : SyncedStorable>: BaseCollection<T>, SomeSynced
}
}
}
return batch
self._sendOperationBatch(batch)
}
/// Adds or update a sequence and writes
override public func addOrUpdate(contentOfs sequence: any Sequence<T>) {
let batch = self._addOrUpdateCore(contentOfs: sequence)
Task { await self._sendOperationBatch(batch) }
}
func addOrUpdateAsync(contentOfs sequence: any Sequence<T>) async {
let batch = self._addOrUpdateCore(contentOfs: sequence)
await self._executeBatchOnce(batch)
}
/// Proceeds to delete all instance of the collection, properly cleaning up dependencies and sending API calls
@ -137,13 +199,11 @@ public class SyncedCollection<T : SyncedStorable>: BaseCollection<T>, SomeSynced
}
/// Deletes all items of the sequence by id and sets the collection as changed to trigger a write
public override func delete(contentOfs sequence: any RandomAccessCollection<T>) {
fileprivate func _deleteCore(contentOfs sequence: any RandomAccessCollection<T>) -> OperationBatch<T> {
defer {
self.setChanged()
}
guard sequence.isNotEmpty else { return }
var deleted: [T] = []
@ -156,7 +216,30 @@ public class SyncedCollection<T : SyncedStorable>: BaseCollection<T>, SomeSynced
let batch = OperationBatch<T>()
batch.deletes = deleted
self._sendOperationBatch(batch)
return batch
}
/// Deletes all items of the sequence by id and sets the collection as changed to trigger a write
public override func delete(contentOfs sequence: any RandomAccessCollection<T>) {
guard sequence.isNotEmpty else { return }
let batch = self._deleteCore(contentOfs: sequence)
Task { await self._sendOperationBatch(batch) }
}
/// Deletes all items of the sequence by id and sets the collection as changed to trigger a write
public func deleteAsync(contentOfs sequence: any RandomAccessCollection<T>) async {
guard sequence.isNotEmpty else { return }
let batch = self._deleteCore(contentOfs: sequence)
await self._executeBatchOnce(batch)
}
/// Deletes an instance and writes
func deleteAsync(instance: T) async {
defer {
self.setChanged()
}
self._deleteNoWrite(instance: instance)
Task { await self._executeBatchOnce(OperationBatch(delete: instance)) }
}
/// Deletes an instance and writes
@ -165,14 +248,14 @@ public class SyncedCollection<T : SyncedStorable>: BaseCollection<T>, SomeSynced
self.setChanged()
}
self._deleteNoWrite(instance: instance)
Task { await self._sendDeletion(instance) }
}
/// Deletes an instance without writing, logs the operation and sends an API call
fileprivate func _deleteNoWrite(instance: T) {
self.deleteItem(instance, shouldBeSynchronized: true)
StoreCenter.main.createDeleteLog(instance)
self._sendDeletion(instance)
// await self._sendDeletion(instance)
}
public func deleteDependencies(_ items: any RandomAccessCollection<T>, shouldBeSynchronized: Bool) {
@ -227,25 +310,31 @@ public class SyncedCollection<T : SyncedStorable>: BaseCollection<T>, SomeSynced
// MARK: - Send requests
fileprivate func _sendInsertion(_ instance: T) {
self._sendOperationBatch(OperationBatch(insert: instance))
fileprivate func _sendInsertion(_ instance: T) async {
await self._sendOperationBatch(OperationBatch(insert: instance))
}
fileprivate func _sendUpdate(_ instance: T) {
self._sendOperationBatch(OperationBatch(update: instance))
fileprivate func _sendUpdate(_ instance: T) async {
await self._sendOperationBatch(OperationBatch(update: instance))
}
fileprivate func _sendDeletion(_ instance: T) {
self._sendOperationBatch(OperationBatch(delete: instance))
fileprivate func _sendDeletion(_ instance: T) async {
await self._sendOperationBatch(OperationBatch(delete: instance))
}
fileprivate func _sendOperationBatch(_ batch: OperationBatch<T>) {
Task {
do {
try await StoreCenter.main.sendOperationBatch(batch)
} catch {
Logger.error(error)
}
fileprivate func _sendOperationBatch(_ batch: OperationBatch<T>) async {
do {
try await StoreCenter.main.sendOperationBatch(batch)
} catch {
Logger.error(error)
}
}
fileprivate func _executeBatchOnce(_ batch: OperationBatch<T>) async {
do {
try await StoreCenter.main.singleBatchExecution(batch)
} catch {
Logger.error(error)
}
}
@ -333,11 +422,11 @@ public class SyncedCollection<T : SyncedStorable>: BaseCollection<T>, SomeSynced
// MARK: - Others
/// Sends a POST request for the instance, and changes the collection to perform a write
public func writeChangeAndInsertOnServer(instance: T) {
public func writeChangeAndInsertOnServer(instance: T) async {
defer {
self.setChanged()
}
self._sendInsertion(instance)
await self._sendInsertion(instance)
}
}

Loading…
Cancel
Save