Well, it’s time to create our first implementation, first, we need a place to store the deltas generated in the process of tracking changes in a data object.
To keep the Implementation simple, we will create a delta store that saves the deltas in memory. This delta store can also be used for testing purposes
MemoryDeltaStore
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/MemoryDeltaStore.cs
public class MemoryDeltaStore : BIT.Data.Sync.DeltaStoreBase
{
IList<IDelta> Deltas;
public MemoryDeltaStore(IEnumerable<IDelta> Deltas)
{
this.Deltas = new List<IDelta>(Deltas);
}
protected MemoryDeltaStore()
{
}
//TODO fix the use of MemoryDb
public MemoryDeltaStore(DeltaStoreSettings deltaStoreSettings) : base(deltaStoreSettings)
{
}
public async override Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (IDelta delta in deltas)
{
cancellationToken.ThrowIfCancellationRequested();
Deltas.Add(new Delta(delta));
}
}
public override Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var result = Deltas.Where(d => d.Index.CompareTo(startindex) > 0 && string.Compare(d.Identity, identity, StringComparison.Ordinal) != 0);
return Task.FromResult(result.Cast<IDelta>());
}
public override Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.FromResult(Deltas.Where(d => d.Index.CompareTo(startindex) > 0).ToList().Cast<IDelta>());
}
Guid LastProcessedDelta;
public override async Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default)
{
return LastProcessedDelta;
}
public override async Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
LastProcessedDelta = Index;
}
Guid LastPushedDelta;
public async override Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken)
{
return LastPushedDelta;
}
public async override Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
LastPushedDelta = Index;
}
public async override Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return Deltas.Count(d => d.Index.CompareTo(startindex) > 0);
}
public async override Task PurgeDeltasAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
Deltas.Clear();
}
}
Now that we have a delta store in place, we need a data object, something that we can use to generate data and track how the data is changing, so again for test purposes, I have implemented a small in-memory database
SimpleDatabase
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabase.cs
public class SimpleDatabase
{
public IDeltaProcessor DeltaProcessor { get; set; }
public string Identity { get; set; }
public IDeltaStore DeltaStore { get; set; }
public SimpleDatabase(IDeltaStore deltaStore, string identity, List<SimpleDatabaseRecord> Data)
{
Identity = identity;
DeltaStore = deltaStore;
this.Data= Data;
}
List<SimpleDatabaseRecord> Data;
public async void Update(SimpleDatabaseRecord Instance)
{
var ObjectToUpdate = Data.FirstOrDefault(x => x.Key == Instance.Key);
if (ObjectToUpdate != null)
{
var Index = Data.IndexOf(ObjectToUpdate);
Data[Index] = Instance;
SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Update, Instance);
await SaveDelta(item);
}
}
private async Task SaveDelta(SimpleDatabaseModification item)
{
var Delta = DeltaStore.CreateDelta(Identity,item);
await DeltaStore.SaveDeltasAsync(new List<IDelta>() { Delta }, default);
}
public void Delete(SimpleDatabaseRecord Instance)
{
var ObjectToDelete= Data.FirstOrDefault(x=>x.Key==Instance.Key);
if(ObjectToDelete!=null)
{
Data.Remove(ObjectToDelete);
}
}
public async Task Add(SimpleDatabaseRecord Instance)
{
Data.Add(Instance);
SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Add, Instance);
await SaveDelta(item);
}
}
In the class above I have implemented methods to add, delete and update a record. Inside each method I create an instance of an object called SimpleDatabaseModification, I used that object to keep track of which operation is happening and keep a copy of the instance being handle at the moment, that is what we are going to save as a delta.
SimpleDatabaseModification
public class SimpleDatabaseModification
{
public OperationType Operation { get; set; }
public SimpleDatabaseModification(OperationType operation, SimpleDatabaseRecord record)
{
Operation = operation;
Record = record;
}
public SimpleDatabaseRecord Record { get; set; }
}
Now since the SimpleDatabase is saving the records on a list the next step is to create a processor that gets the information out of the delta and use it to recreate that list, so here is the delta processor
SimpleDatabaseDeltaProcessor
public class SimpleDatabaseDeltaProcessor :DeltaProcessorBase
{
List<SimpleDatabaseRecord> _CurrentText;
public SimpleDatabaseDeltaProcessor(DeltaStoreSettings deltaStoreSettings, List<SimpleDatabaseRecord> CurrentData) : base(deltaStoreSettings)
{
_CurrentText= CurrentData;
}
public override Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (IDelta delta in deltas)
{
cancellationToken.ThrowIfCancellationRequested();
var Modification= this.GetDeltaOperations<SimpleDatabaseModification>(delta);
switch (Modification.Operation)
{
case OperationType.Add:
this._CurrentText.Add(Modification.Record);
break;
case OperationType.Delete:
var ObjectToDelete= this._CurrentText.FirstOrDefault(x=>x.Key==Modification.Record.Key);
this._CurrentText.Remove(ObjectToDelete);
break;
case OperationType.Update:
var ObjectToUpdate = this._CurrentText.FirstOrDefault(x => x.Key == Modification.Record.Key);
var Index= this._CurrentText.IndexOf(ObjectToUpdate);
this._CurrentText[Index] = Modification.Record;
break;
}
}
return Task.CompletedTask;
}
}
Well, that is for this post, in the next post we will create some test scenarios to test our implementations