Synchronization Framework Base Classes

Synchronization Framework Base Classes

Now that we have defined the bases contracts necessary for synchronization, we can define some base classes that implement those contracts, the main idea behind these base classes is to, later on, add the possibility to inject configurations with .net dependency injection.

Let’s start with the delta implementation

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Delta.cs

/// <summary>
    /// An implementation of the IDelta interface, this class is primary used for serialization and transportation purpose 
    /// </summary>
    public class Delta : IDelta
    {
        public Delta()
        {
        }
        public static Guid GenerateComb()
        {
            return Provider.PostgreSql.Create();
           
        }
        public Delta(string identity, byte[] operation, bool processed = false)
        {

            Identity = identity;
            Operation = operation;
            Processed = processed;
        }
        public Delta(IDelta Delta)
        {

            Identity = Delta.Identity;
            Index = Delta.Index;
            Operation = Delta.Operation;
          

        }
        public Delta(string identity, Guid index, byte[] operation, bool processed = false)
        {

            Identity = identity;
            Index = index;
            Operation = operation;
            Processed = processed;
        }
        public virtual DateTime Date { get; set; }
        public virtual string Identity { get; set; }

        public virtual Guid Index { get; set; }

        public virtual byte[] Operation { get; set; }
        public virtual bool Processed { get; set; }
        public virtual double Epoch { get; set; }

    }

Now the delta store

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/DeltaStoreBase.cs

   public abstract class DeltaStoreBase : IDeltaStore
   {

       protected DeltaStoreBase()
       {

       }
       protected DeltaStoreSettings _deltaStoreSettings;

       public string Identity { get; private set; }

       public DeltaStoreBase(DeltaStoreSettings deltaStoreSettings)
       {
           this._deltaStoreSettings = deltaStoreSettings;
           Setup();
       }
       protected virtual void Setup()
       {

       }
       public abstract Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default);

       public abstract Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default);
       //public abstract Task<IEnumerable<IDelta>> GetDeltasToSendAsync(Guid startindex, CancellationToken cancellationToken = default);
       public abstract Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default);
       public abstract Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default);

       public abstract Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default);

       public void SetIdentity(string Identity)
       {
           this.Identity = Identity;
       }

       public abstract Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken = default);
       public abstract Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default);

       public abstract Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken=default);
       public abstract Task PurgeDeltasAsync(CancellationToken cancellationToken);
   }

and finally, the delta processor

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/DeltaProcessorBase.cs

public abstract class DeltaProcessorBase : IDeltaProcessor
{
    protected DeltaStoreSettings _deltaStoreSettings;
    public DeltaProcessorBase(DeltaStoreSettings deltaStoreSettings)
    {
        _deltaStoreSettings = deltaStoreSettings;
    }

    public abstract Task ProcessDeltasAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken);

}

That’s it for this post, see you on the next post “Planning the first implementation”

 

Let’s write a Synchronization Framework in C#

Let’s write a Synchronization Framework in C#

Ok in the last post we defined the conceptual parts of a synchronization framework, now let’s create the code that represents those parts

Delta

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/IDelta.cs

 

/// <summary>
/// Represents a transaction made to the database 
/// </summary>
public interface IDelta
{
   double Epoch { get; set; }
    /// <summary>
    /// Who created the delta
    /// </summary>
    string Identity { get; set; }
    /// <summary>
    /// The unique identifier of the delta
    /// </summary>
    Guid Index { get; }
    /// <summary>
    /// The database transaction(s) that represents this delta
    /// </summary>
    byte[] Operation { get; set; }  
}

 

Epoch: The date when the operation happened

Identity: Who created the delta

Index: A sortable GUID

Operation: The database transaction(s) that represents this delta

 

Delta Processor

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaProcessor.cs

public interface IDeltaProcessor
{
    /// <summary>
    /// Extracts the content of an IEnumerable of deltas and process it on the current data object
    /// </summary>
    /// <param name="deltas">an IEnumerable of deltas</param>
    /// <param name="cancellationToken">Cancellation token</param>
    /// <returns>An empty task</returns>
    Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

As you can see the delta processor is really simple, it only contains one method that is in charge of getting the content of a group of deltas and process those differences in the current data object

 

Delta Store

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaStore.cs

public interface IDeltaStore
    {
        string Identity { get; }
        void SetIdentity(string Identity);
        /// <summary>
        /// Saves the IEnumerable<IDelta> of deltas in the current store
        /// </summary>
        /// <param name="deltas">The IEnumerable<IDelta> to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
        /// <summary>
        /// Gets an IEnumerable<IDelta> of deltas generated by other nodes with indeces greater than the start index 
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="myIdentity">The identity of the current node </param>
        /// <param name="cancellationToken">a Cancellation token</param>
        /// <returns>An IEnumerable with deltas generated by other nodes</returns>
        Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string myIdentity, CancellationToken cancellationToken);
        /// <summary>
        /// Get all deltas in the store with an index greater than the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">a cancellation token</param>
        /// <returns>An IEnumerable of deltas</returns>
        Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the count of deltas with indeces greater that the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>The count</returns>
        Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the index of the last delta process by this data object
        /// </summary>
        /// <param name="cancellationToken"> cancellation token</param>
        /// <returns>The index of the last delta process by this data object</returns>
        Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta process by this data object
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        ///  Gets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>the index of the last delta pushed to the server node</returns>
        Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        /// Delete all deltas in the store
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task PurgeDeltasAsync(CancellationToken cancellationToken);

    }

SaveDeltasAsync :Saves the IEnumerable<IDelta> of deltas in the current store

GetDeltasFromOtherNodes: Gets an IEnumerable<IDelta> of deltas generated by other nodes with indices greater than the start index

GetDeltasAsync: Get all deltas in the store with an index greater than the start index

GetDeltaCountAsync: Gets the count of deltas with indices greater than the start index

GetLastProcessedDeltaAsync: Gets the index of the last delta process by this data object

SetLastProcessedDeltaAsync: Sets the index of the last delta process by this data object

GetLastPushedDeltaAsync: Gets the index of the last delta pushed to the server node

SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken): Sets the index of the last delta pushed to the server node

PurgeDeltasAsync: Delete all deltas in the store

That’s all for this post in the next post we will define the bases classes that implement the interfaces described above

Parts of a Synchronization Framework

Parts of a Synchronization Framework

In the last post, we talked about what are deltas and how by using them we can synchronize data structures.

So, in this post, I will describe the necessary parts needed to implement Delta-based synchronization, let’s start

  • Data Object: any database, object, graph, or file system that we are tracking for synchronization
  • Delta: a delta is a data difference between the original and the current state of a data object
  • Node: is a point of the synchronization network, there are 2 types of nodes
    1. Client node: a node that is able to produce and process deltas
    2. Server node: a node that is used only to exchange deltas, it can optionally process deltas too.
  • Delta Store: storage where you can save deltas so you can later exchange them with other nodes
  • Delta Processor: a utility that helps you includes the deltas created in other nodes in your current copy of the data object

Now let’s implement this concept to synchronize a database

Ok so each database that we want to synchronize will be a client node and a client node should be able to produce and store deltas and to process deltas produced by other nodes, so our database node should look like the following diagram

The server node can be as simple as just delta storage exposed in an HTTP API as you can see in the basic server node diagram or use several delta storages and delta processors as show on the complex server node diagram

                                         

 

And with those diagrams, I finish this post, in the next post we will implement those concepts in C# code

Data synchronization in a few words

Data synchronization in a few words

To Synchronize data is one of the most challenging tasks out there, especially if you are working with LOB applications

There are many ways to synchronize data, the most common technique is to compare records by modification date and then merge the data to create a master record.

Here the main problem is that you have to have a way to compare each type of record, so it gets cumbersome as soon as the number of types in your application begins to grow.

Also, you have to have a log of what gets created and what gets deleted so you can do the same changes on the other nodes

Delta-based synchronization

delta-based synchronization is a problem of identifying “what changed” between each execution of a sync process

A directed delta also called a change, is a sequence of (elementary) change operations which, when applied to one version V1, yields another version V2 (note the correspondence to transaction logs in databases). In computer implementations

In delta synchronization, there are 2 main tasks

  • Record the deltas (or small difference of data between nodes)
  • Transmit these differences to the other nodes so they can process them

Implementing Delta-based synchronization for relational databases

The schema above represents a blog’s database, it’s a really simple schema so its easy to understand, now this is the scenario

We have the main database that we will call “Master” and 2 other databases named client A and client B.

Now let insert data in the master

Each DML statement should be converted in a delta (or a data difference)

Δ1

Δ2

Δ3

Copy deltas Δ 1, Δ 2, Δ 3 to the clients

So, after processing the deltas on each client, the data in all databases should look like the picture below

 

 

So that’s it for this post, in the next post we will be examing each part that is necessary to do delta-based synchronization

 

 

 

How to fix “The type initializer for ‘Gdip’ threw an exception” caused by the netcore framework depencency,  when you run a Xaf Blazor App on ubuntu linux 18.04

How to fix “The type initializer for ‘Gdip’ threw an exception” caused by the netcore framework depencency, when you run a Xaf Blazor App on ubuntu linux 18.04

If you are running Xaf Blazor in ubuntu 18.04 you might have seen the following exception

The type initializer for ‘Gdip’ threw an exception.
at DevExpress.ExpressApp.Actions.ActionBase.OnHandleException(Exception e) at DevExpress.ExpressApp.Actions.ActionBase.ExecuteCore(Delegate handler, ActionBaseEventArgs eventArgs) at DevExpress.ExpressApp.Actions.PopupWindowShowAction.DoExecute(Window window) at DevExpress.ExpressApp.Actions.PopupWindowShowAction.DialogController_Accepting(Object sender, DialogControllerAcceptingEventArgs e) at DevExpress.ExpressApp.SystemModule.DialogController.Accept(SimpleActionExecuteEventArgs args) at DevExpress.ExpressApp.SystemModule.DialogController.acceptAction_OnExecute(Object sender, SimpleActionExecuteEventArgs e) at DevExpress.ExpressApp.Actions.SimpleAction.RaiseExecute(ActionBaseEventArgs eventArgs) at DevExpress.ExpressApp.Actions.ActionBase.ExecuteCore(Delegate handler, ActionBaseEventArgs eventArgs)

The error is caused by missing dependency, so the DotNet runtime itself will throw that exception. Also, I want to highlight that the exception is not related to XAF, you can read more about this problem here https://github.com/dotnet/runtime/issues/27200

To get the missing dependency just open a console and run the following commands

sudo apt-get update -y

sudo apt-get install -y libgdiplus