Ok in the last post we defined the conceptual parts of a synchronization framework, now let’s create the code that represents those parts

Delta

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/IDelta.cs

 

/// <summary>
/// Represents a transaction made to the database 
/// </summary>
public interface IDelta
{
   double Epoch { get; set; }
    /// <summary>
    /// Who created the delta
    /// </summary>
    string Identity { get; set; }
    /// <summary>
    /// The unique identifier of the delta
    /// </summary>
    Guid Index { get; }
    /// <summary>
    /// The database transaction(s) that represents this delta
    /// </summary>
    byte[] Operation { get; set; }  
}

 

Epoch: The date when the operation happened

Identity: Who created the delta

Index: A sortable GUID

Operation: The database transaction(s) that represents this delta

 

Delta Processor

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaProcessor.cs

public interface IDeltaProcessor
{
    /// <summary>
    /// Extracts the content of an IEnumerable of deltas and process it on the current data object
    /// </summary>
    /// <param name="deltas">an IEnumerable of deltas</param>
    /// <param name="cancellationToken">Cancellation token</param>
    /// <returns>An empty task</returns>
    Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

As you can see the delta processor is really simple, it only contains one method that is in charge of getting the content of a group of deltas and process those differences in the current data object

 

Delta Store

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaStore.cs

public interface IDeltaStore
    {
        string Identity { get; }
        void SetIdentity(string Identity);
        /// <summary>
        /// Saves the IEnumerable<IDelta> of deltas in the current store
        /// </summary>
        /// <param name="deltas">The IEnumerable<IDelta> to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
        /// <summary>
        /// Gets an IEnumerable<IDelta> of deltas generated by other nodes with indeces greater than the start index 
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="myIdentity">The identity of the current node </param>
        /// <param name="cancellationToken">a Cancellation token</param>
        /// <returns>An IEnumerable with deltas generated by other nodes</returns>
        Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string myIdentity, CancellationToken cancellationToken);
        /// <summary>
        /// Get all deltas in the store with an index greater than the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">a cancellation token</param>
        /// <returns>An IEnumerable of deltas</returns>
        Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the count of deltas with indeces greater that the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>The count</returns>
        Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the index of the last delta process by this data object
        /// </summary>
        /// <param name="cancellationToken"> cancellation token</param>
        /// <returns>The index of the last delta process by this data object</returns>
        Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta process by this data object
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        ///  Gets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>the index of the last delta pushed to the server node</returns>
        Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        /// Delete all deltas in the store
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task PurgeDeltasAsync(CancellationToken cancellationToken);

    }

SaveDeltasAsync :Saves the IEnumerable<IDelta> of deltas in the current store

GetDeltasFromOtherNodes: Gets an IEnumerable<IDelta> of deltas generated by other nodes with indices greater than the start index

GetDeltasAsync: Get all deltas in the store with an index greater than the start index

GetDeltaCountAsync: Gets the count of deltas with indices greater than the start index

GetLastProcessedDeltaAsync: Gets the index of the last delta process by this data object

SetLastProcessedDeltaAsync: Sets the index of the last delta process by this data object

GetLastPushedDeltaAsync: Gets the index of the last delta pushed to the server node

SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken): Sets the index of the last delta pushed to the server node

PurgeDeltasAsync: Delete all deltas in the store

That’s all for this post in the next post we will define the bases classes that implement the interfaces described above