AspNetCore Kestrel: change your application port using a configuration file

AspNetCore Kestrel: change your application port using a configuration file

Modern AspNetCore applications use the built-in web server kestrel,this server is usually bound to the localhost address using the ports 5000 and 5001 for http and https.

But what if you want to run 2 applications in the same server? then you have a problem because if you use the default ports one of the applications will not start correctly.

This can easily be solved by changing the default ports in your WebHostBuilder as shown below

public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
   .ConfigureWebHostDefaults(webBuilder => {
       webBuilder.UseUrls("http://0.0.0.0:8016");
       webBuilder.UseStartup<Startup>();
   });

The problem with the example above is that the URLs are hardcoded, so here is a better solution

public static IHostBuilder CreateHostBuilder(string[] args) =>
  Host.CreateDefaultBuilder(args)
  .ConfigureWebHostDefaults(webBuilder => {
      var config = new ConfigurationBuilder()
      .SetBasePath(Directory.GetCurrentDirectory())
      .AddJsonFile("hosting.json", optional: true)
      .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
      .AddCommandLine(args)
      .AddEnvironmentVariables()
      .Build();

      webBuilder.UseUrls(config["server.urls"]);
      webBuilder.UseStartup<Startup>();
  });

the example above uses a configuration builder to merge the appsettings.json and the hosting.json in a single configuration object, then with use the value of the property “server.urls” as base URL/port for our application

Here is the content of the hosting.json file

{
    "server.urls": "http://0.0.0.0:8016" 
}

 

 

 

Blazor: Use tag helpers to include java scripts and CSS in your html header

Blazor: Use tag helpers to include java scripts and CSS in your html header

Sometime we want to reuse our Blazor components in another apps, the best way to do this is to create a razor library, this process of create a razor library is not different from create a normal class library to share code. There is only one exception, razor components might need to reference JavaScript or CSS files. This problem can be easily solve in 2 steps as shown below.

1) Create a class that inherits from TagHelperComponent,,this class should include the tags that you want to include in the html header section of your app

using Microsoft.AspNetCore.Html;
using Microsoft.AspNetCore.Razor.TagHelpers;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;


namespace MyBlazorApp
{
  [HtmlTargetElement("head")]
  public class MyTagHelper: TagHelperComponent
  {
    private string Tags= 
@"
        <!-- ZXingBlazor -->
        <script src=""_content/ZXingBlazor/lib/barcodereader/zxing.js""></script>
        <script src = ""_content/ZXingBlazor/lib/barcodereader/barcode.js"" ></ script >
        < !--ZXingBlazor-- >
        < !--Signature Pad  -->
        <script src = ""_content/Mobsites.Blazor.SignaturePad/bundle.js"" ></ script >
        < link href=""_content/Mobsites.Blazor.SignaturePad/bundle.css"" rel=""stylesheet"" />
        < link href=""_content/Ultra.PropertyEditors.Module.Blazor/js/signaturepropertyeditor.js""/>
        <!-- Signature Pad  -->
        <!-- HTML Editor  -->
        <link href = ""//cdn.quilljs.com/1.3.6/quill.snow.css"" rel=""stylesheet"">
        <link href = ""//cdn.quilljs.com/1.3.6/quill.bubble.css"" rel=""stylesheet"">
        <script src = ""https://cdn.quilljs.com/1.3.6/quill.js"" ></ script >
        <script src=""_content/Blazored.TextEditor/quill-blot-formatter.min.js""></script>
        <script src = ""_content/Blazored.TextEditor/Blazored-BlazorQuill.js"" ></ script >
        < !--HTML Editor  -->
";
    public override Task ProcessAsync(TagHelperContext context, TagHelperOutput output)
    {
      if (string.Equals(context.TagName, "head", StringComparison.OrdinalIgnoreCase))
      {
        output.PostContent.AppendHtml(Tags).AppendLine();
      }
      return Task.CompletedTask;
    }
  }
}

*Note: to reference JavaScript or CSS from any razor library you can use the following syntax,please notice the doble quotes.

<script src=""_content/MyAssemblyName/PathToMyJavaScript/MyJavaScriptFile.js""></script>

 

2) Create an extension method in the “Microsoft.Extensions.DependencyInjection” namespace so you can easily add your tag helper to the service collection

 

using Microsoft.AspNetCore.Razor.TagHelpers;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.Extensions.DependencyInjection
{
  public static class StartupExtensions
  {
    public static IServiceCollection AddMyHtmlTags(this IServiceCollection services)
    {
      services.AddTransient<ITagHelperComponent, MyTagHelper>();
      return services;
    }
  }
}

 

Here is an example on how to use your new extension in your startup class

 public void ConfigureServices(IServiceCollection services
 {
   services.AddMyHtmlTags();
 }

 

Log XPO queries in a Netcore app (3,5,6)

Log XPO queries in a Netcore app (3,5,6)

The LogLevel section in the appsettings.json file does not affect the .NET Framework tracing mechanism, which is used by XPO to log the queries, still we have a few work arounds for a Netcore app

  1. We can implement our own logger as shown here https://docs.devexpress.com/XPO/403928/best-practices/how-to-log-sql-queries#xpo-logger-net
  2. We can set the value of the logging switch by reflection using the following snippet
private static void EnableXpoDebugLog()
{
    FieldInfo xpoSwitchF = typeof(ConnectionProviderSql).GetField("xpoSwitch", System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.NonPublic);
    TraceSwitch xpoSwitch = (TraceSwitch)xpoSwitchF.GetValue(null);
    xpoSwitch.Level = TraceLevel.Info;
}

I actually use the second work around in my own projects.

If you want to learn more about the logging mechanism of XPO you can take a look to the following articles

https://supportcenter.devexpress.com/ticket/details/t1033081/how-to-log-the-sql-queries-made-by-xpo-in-net-5-applications

https://supportcenter.devexpress.com/ticket/details/t913939/enable-logging-in-xaf-blazor

Implementing database synchronization with entity framework core

Implementing database synchronization with entity framework core

Ok, so far, our synchronization framework is only implemented for an in-memory database that we use for testing purposes.

Now let’s implement a different use case, lets add synchronization functionality to an entity framework core DbContext.

As I explained before, the key part of synchronizing data using delta encoding is to be able to track the differences that happen to a data object, in this case, a relational database.

these are the task that we need to do to accomplish our goal

  1. Find out how entity framework converts the changes that happen to the objects to SQL commands
  2. Decide what information we need to track and save as a delta
  3. Create the infrastructure to save deltas (IDeltaStore)
  4. Create the infrastructure to process deltas (IDeltaProcessor)
  5. Implement the synchronization node functionality in an Entity Framework DbContext(ISyncClientNode)
  6. Create a test scenario

 

1 Find out how entity framework converts the changes that happen to the objects to SQL commands

In our companies (BitFrameworks & Xari) we have been working in data synchronization for a while, but all this work has been done in the XPO realm.

We know that in most ORMs frameworks there is a layer of the ORM that is in charge of translating the changes made to objects into SQL commands, the trick is to locate this layer. So while I was trapped in Mexico waiting for a flight back to Phoenix, I decided to dig into entity framework’s core GitHub report, this is what I found https://github.com/dotnet/efcore/blob/b18a7efa7c418e43184db08c6d1488d6600054cb/src/EFCore.Relational/Update/Internal/BatchExecutor.cs#L161

public virtual async Task<int> ExecuteAsync(
           IEnumerable<ModificationCommandBatch> commandBatches,
           IRelationalConnection connection,
           CancellationToken cancellationToken = default)

As you can see one of the parameters is an IEnumerable of ModificationCommandBatch https://github.com/dotnet/efcore/blob/main/src/EFCore.Relational/Update/ModificationCommandBatch.cs this command batch exposes a read-only list of modification commands (ModificationCommand)

https://github.com/dotnet/efcore/blob/cc53b3e80755e5d882bb21ef10e0e0e33194d9bd/src/EFCore.Relational/Update/ModificationCommandBatch.cs#L30

public abstract class ModificationCommandBatch
{
    /// <summary>
    ///     The list of conceptual insert/update/delete <see cref="ModificationCommands" />s in the batch.
    /// </summary>
    public abstract IReadOnlyList<IReadOnlyModificationCommand> ModificationCommands { get; }

now let’s take look into the ModificationCommand https://github.com/dotnet/efcore/blob/main/src/EFCore.Relational/Update/ModificationCommand.cs this class provides all the information about the changes that will be converted into SQL commands, which means that if we serialize this object and save it as a delta we can then send it to another node and replicate the changes…VOILA!!!

Now here is a stone in our path, the class https://github.com/dotnet/efcore/blob/main/src/EFCore.Relational/Update/ModificationCommand.cs is not serializable or to say it in a better way NOT easily serializable, so let’s stop here for a moment and move to a different task

So now we know where the changes that we need to keep track of are, now let’s try to understand how those changes are converted into SQL commands and then executed into the database.

2 Decide what information we need to track and save as a delta

Entity framework core uses dependency injection to be able to handle different database engines so the idea here is that there are a lot of small services that can be replaced in other to create a different implementation, for example, SQLite, SqlServer, Postgres, etc …

After a lot of digging, I found that the service that is in charge of generating the update commands (insert, update and delete) UpdateSqlGenerator

https://github.com/dotnet/efcore/blob/main/src/EFCore.Relational/Update/UpdateSqlGenerator.cs

this class implements IUpdateSqlGenerator https://github.com/dotnet/efcore/blob/main/src/EFCore.Relational/Update/IUpdateSqlGenerator.cs and as you can see all methods receive a string builder and a ModificationCommand so this is the service  in charge of translating the ModificationCommand into SQL commands and SQL commands are easy to serialize because they are just text, so this is what we are going to serialize and save as a delta

    public interface IUpdateSqlGenerator
    {
        /// <summary>
        ///     Generates SQL that will obtain the next value in the given sequence.
        /// </summary>
        /// <param name="name">The name of the sequence.</param>
        /// <param name="schema">The schema that contains the sequence, or <see langword="null" /> to use the default schema.</param>
        /// <returns>The SQL.</returns>
        string GenerateNextSequenceValueOperation(string name, string? schema);

        /// <summary>
        ///     Generates a SQL fragment that will get the next value from the given sequence and appends it to
        ///     the full command being built by the given <see cref="StringBuilder" />.
        /// </summary>
        /// <param name="commandStringBuilder">The builder to which the SQL fragment should be appended.</param>
        /// <param name="name">The name of the sequence.</param>
        /// <param name="schema">The schema that contains the sequence, or <see langword="null" /> to use the default schema.</param>
        void AppendNextSequenceValueOperation(
            StringBuilder commandStringBuilder,
            string name,
            string? schema);

        /// <summary>
        ///     Appends a SQL fragment for the start of a batch to
        ///     the full command being built by the given <see cref="StringBuilder" />.
        /// </summary>
        /// <param name="commandStringBuilder">The builder to which the SQL fragment should be appended.</param>
        void AppendBatchHeader(StringBuilder commandStringBuilder);

        /// <summary>
        ///     Appends a SQL command for deleting a row to the commands being built.
        /// </summary>
        /// <param name="commandStringBuilder">The builder to which the SQL should be appended.</param>
        /// <param name="command">The command that represents the delete operation.</param>
        /// <param name="commandPosition">The ordinal of this command in the batch.</param>
        /// <returns>The <see cref="ResultSetMapping" /> for the command.</returns>
        ResultSetMapping AppendDeleteOperation(
            StringBuilder commandStringBuilder,
            IReadOnlyModificationCommand command,
            int commandPosition);

        /// <summary>
        ///     Appends a SQL command for inserting a row to the commands being built.
        /// </summary>
        /// <param name="commandStringBuilder">The builder to which the SQL should be appended.</param>
        /// <param name="command">The command that represents the delete operation.</param>
        /// <param name="commandPosition">The ordinal of this command in the batch.</param>
        /// <returns>The <see cref="ResultSetMapping" /> for the command.</returns>
        ResultSetMapping AppendInsertOperation(
            StringBuilder commandStringBuilder,
            IReadOnlyModificationCommand command,
            int commandPosition);

        /// <summary>
        ///     Appends a SQL command for updating a row to the commands being built.
        /// </summary>
        /// <param name="commandStringBuilder">The builder to which the SQL should be appended.</param>
        /// <param name="command">The command that represents the delete operation.</param>
        /// <param name="commandPosition">The ordinal of this command in the batch.</param>
        /// <returns>The <see cref="ResultSetMapping" /> for the command.</returns>
        ResultSetMapping AppendUpdateOperation(
            StringBuilder commandStringBuilder,
            IReadOnlyModificationCommand command,
            int commandPosition);
    }

3 Create the infrastructure to save deltas (Implementing IDeltaStore)

Now is time to create a delta store, this is an easy one since we only need to inherit from our delta store base and save the information in an entity framework DbContext, so here is the implementation

https://github.com/egarim/SyncFramework/blob/main/src/EntityFrameworkCore/BIT.Data.Sync.EfCore/EFDeltaStore.cs

if you want to compare it with other delta store implementations you can take a look at the in-memory version here

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/MemoryDeltaStore.cs

4 Create the infrastructure to process deltas (implementing IDeltaProcessor)

So far, we know what we need to store in the deltas which basically is SQL commands and their parameters so it means to process those SQL Commands our delta processor needs to create a database connection and execute SQL commands

https://github.com/egarim/SyncFramework/blob/main/src/EntityFrameworkCore/BIT.Data.Sync.EfCore/EFDeltaProcessor.cs

public EFDeltaProcessor(DbContext dBContext) 
{
    _dBContext = dBContext;
  
}
public EFDeltaProcessor(string connectionstring, string DbEngineAlias, string ProviderInvariantName)
{

    this.CurrentDbEngine = DbEngineAlias;
    this.connectionString = connectionstring;

    try
    {
        factory = DbProviderFactories.GetFactory(ProviderInvariantName);
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex.Message);
        throw new Exception("There was a problem creating the database connection using DbProviderFactories.GetFactory. Please your make sure the DbProviderFactory for your database is registered https://docs.microsoft.com/en-us/dotnet/api/system.data.common.dbproviderfactories.registerfactory?view=net-5.0", ex);
    }
    //TODO check provider registration later

    //DbProviderFactories.RegisterFactory("Microsoft.Data.SqlClient", SqlClientFactory.Instance);
}

there are a few things to notice in that class, first, it has 2 constructors because we need 2 different ways to create the connection to the database, one using the entity framework DbContext and one using ADO.NET DbProviderFactory

All the magic happens in the ProcessDeltas method, this method is in charge of, extract the content of the deltas and transforming them into SQL commands and parameters, and then executing the command.

please notice that the content of each delta is an instance of ModificationCommandData

https://github.com/egarim/SyncFramework/blob/main/src/EntityFrameworkCore/BIT.Data.Sync.EfCore/Data/ModificationCommandData.cs

which is a class that allows us to store multiple SQL commands (for different database engines) and their parameters

5 Implement the synchronization node functionality in an Entity Framework DbContext(ISyncClientNode)

At the moment we are able to produce and process deltas for entity framework relational, so the next step is to implement the functionality of synchronization client node by implementing the following interface

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Client/ISyncClientNode.cs

namespace BIT.Data.Sync.Client
{
    public interface ISyncClientNode
    {
        IDeltaProcessor DeltaProcessor { get; }
        IDeltaStore DeltaStore { get; }
        ISyncFrameworkClient SyncFrameworkClient { get; }
        string Identity { get;  }

    }
}

https://github.com/egarim/SyncFramework/blob/main/src/EntityFrameworkCore/BIT.Data.Sync.EfCore/SyncFrameworkDbContext.cs

The server-side

I’m not going to show the implementation of the server since that implementation is generic and uses the same delta store and delta processor that we created at the beginning of this article. for more information check the following links

Adding network support

https://www.jocheojeda.com/2021/10/17/syncframework-adding-network-support/

Testing network support

https://www.youtube.com/watch?v=mSl0n0O5QIg&t=4s

 

The next post its going to be a video testing a simple synchronization scenario, see you in the next post!!!

 

 

SyncFramework – Adding network support

SyncFramework – Adding network support

So far, all our test exists inside the same process, so they communicate through variables, in real-life scenarios nodes won’t exist in the same process and most of the time not even in the same location.

The easiest and most standard way to implement client-server communication in the dotnet world is a rest API, so let’s define an API client and a server service, lets start with the API client

ISyncFrameworkClient

https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/ISyncFrameworkClient.cs

public interface ISyncFrameworkClient
{
    Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken);
    Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken);
}

As you can see in the code above, this interface is really simple because the client only implements 2 operations

  • Fetch: downloads the deltas from the server
  • Push: upload the deltas from our delta store to the server

Now we should create an implementation for this interface, see the code below

https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/SyncFrameworkHttpClient.cs

 

public class SyncFrameworkHttpClient : ISyncFrameworkClient
{
    HttpClient _httpClient;
    public string DeltaStoreId { get; }
    public SyncFrameworkHttpClient(HttpClient httpClient,string NodeId)
    {
        this.DeltaStoreId = NodeId;
        _httpClient = httpClient;
        _httpClient.DefaultRequestHeaders.Add("NodeId", NodeId);
      
        this.DeltaStoreId = NodeId;
    }
    public SyncFrameworkHttpClient(string BaseAddress, string DeltaStoreId):this(new HttpClient() { BaseAddress=new Uri(BaseAddress)},DeltaStoreId)
    {
       
    }
    public virtual async Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken  = default)
    {
       
        try
        {
            List<Delta> toserialzie = new List<Delta>();
            foreach (IDelta delta in Deltas)
            {
                toserialzie.Add(new Delta(delta));
            }
            cancellationToken.ThrowIfCancellationRequested();

            DataContractJsonSerializer js = new DataContractJsonSerializer(typeof(List<Delta>));
            MemoryStream msObj = new MemoryStream();
            js.WriteObject(msObj, toserialzie);
            msObj.Position = 0;
            StreamReader sr = new StreamReader(msObj);
            string jsonDeltas = sr.ReadToEnd();

            var data = new StringContent(jsonDeltas, Encoding.UTF8, "application/json");
            await _httpClient.PostAsync("/Sync/Push", data, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            var message = ex.Message;
            throw;
        }
      
    }
    public virtual async Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken = default)
    {
        var QueryParams = new Dictionary<string, string>();
        QueryParams.Add(nameof(startindex), startindex.ToString());
        QueryParams.Add(nameof(identity), identity);
        cancellationToken.ThrowIfCancellationRequested();
        var query = HttpUtility.ParseQueryString("");
        foreach (KeyValuePair<string, string> CurrentParam in QueryParams)
        {
            query[CurrentParam.Key] = CurrentParam.Value;
        }
        var reponse = await _httpClient.GetStringAsync($"/Sync/Fetch?{query.ToString()}").ConfigureAwait(false);

        using (var ms = new MemoryStream(Encoding.Unicode.GetBytes(reponse)))
        {

            DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(List<Delta>));
            List<Delta> Deltas = (List<Delta>)deserializer.ReadObject(ms);

            return Deltas;
        }

        //List<Delta> Deltas = JsonConvert.DeserializeObject<List<Delta>>(reponse);
        return null;

    }

}

 

it’s an implementation of the ISyncFrameworkClient interface using HTTP communication

  • Fetch: uses an HTTP get request
  • Push: uses an HTTP post request

Also, the “nodeid” header is added to the request, you will understand why when we implement the server part.

Now that we have defined the contract for the client and also provided the base implementation using an HTTP client, its time to define what a client node is, please take a look at the code below

ISyncClientNodeExtensions

https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/ISyncClientNodeExtensions.cs

    public static class ISyncClientNodeExtensions
    {

        public static async Task<List<Delta>> FetchAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            var LastDetalIndex = await instance.DeltaStore.GetLastProcessedDeltaAsync(cancellationToken).ConfigureAwait(false);
            return await instance.SyncFrameworkClient.FetchAsync(LastDetalIndex, instance.Identity, cancellationToken).ConfigureAwait(false);
        }
        public static async Task PullAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            var Deltas = await instance.FetchAsync(cancellationToken).ConfigureAwait(false);
            if (Deltas.Any())
            {
                await instance.DeltaProcessor.ProcessDeltasAsync(Deltas, cancellationToken).ConfigureAwait(false);
                Guid index = Deltas.Max(d => d.Index);
                await instance.DeltaStore.SetLastProcessedDeltaAsync(index, cancellationToken).ConfigureAwait(false);
            }

        }
        public static async Task PushAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            var LastPushedDelta = await instance.DeltaStore.GetLastPushedDeltaAsync(cancellationToken).ConfigureAwait(false);
            var Deltas = await instance.DeltaStore.GetDeltasAsync(LastPushedDelta,cancellationToken).ConfigureAwait(false);
            if (Deltas.Any())
            {
                var Max = Deltas.Max(d => d.Index);
                await instance.SyncFrameworkClient.PushAsync(Deltas, cancellationToken).ConfigureAwait(false);
                await instance.DeltaStore.SetLastPushedDeltaAsync(Max,cancellationToken).ConfigureAwait(false);
            }



        }
    }

so, this is how the SyncClientNode is structured

Let’s move to the server-side now, here the idea is to be able to host multiple delta store and delta processors and also to be able to introduce custom logic either saving the deltas into the delta store or processing the deltas into a data object

ISyncServerNode

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/ISyncServerNode.cs

public interface ISyncServerNode
{
    string NodeId { get; set; }
    Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
    Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, string identity, CancellationToken cancellationToken);
    Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

Now we need to define a server, so here is the interface for the SyncServer

ISyncServer

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/ISyncServer.cs

public interface ISyncServer
{
    IEnumerable<ISyncServerNode> Nodes { get; }
    Task<IEnumerable<IDelta>> GetDeltasAsync(string name, Guid startindex, string identity, CancellationToken cancellationToken);
    Task ProcessDeltasAsync(string Name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
    Task SaveDeltasAsync(string name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

As you can see, the members are almost the same as the sync node, this design allows us to have more than one node on the server-side

Here is the implementation of the SyncServer

SyncServer

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/SyncServer.cs

public class SyncServer: ISyncServer
{


    IEnumerable<ISyncServerNode> _Nodes;
   
    public SyncServer(params ISyncServerNode[] Nodes)
    {
        this._Nodes = Nodes;
    }

    public IEnumerable<ISyncServerNode> Nodes => _Nodes;

   

    public async Task<IEnumerable<IDelta>> GetDeltasAsync(string NodeId, Guid startindex, string identity, CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();
        ISyncServerNode Node = GetNode(NodeId);
        if (Node != null)
        {
            return await Node.GetDeltasAsync(startindex, identity, cancellationToken).ConfigureAwait(false);
        }

        IEnumerable<IDelta> result = new List<IDelta>();
        return result;
    }
    public Task ProcessDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();
        ISyncServerNode Node = GetNode(NodeId);
        if (Node != null)
        {
            return Node.ProcessDeltasAsync(deltas, cancellationToken);
        }
        return null;

    }

    private ISyncServerNode GetNode(string NodeId)
    {
        return Nodes.FirstOrDefault(node => node.NodeId == NodeId);
    }

    public Task SaveDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        ISyncServerNode Node = GetNode(NodeId);

        if (Node != null)
        {
            return Node.SaveDeltasAsync(deltas, cancellationToken);
        }
        return Task.CompletedTask;
    }
}

the following pictures show the 2 possible server implementations

 

and that’s it for this post, in the next post I will show the test cases for this implementation

SyncFramework – Planning the first implementation

SyncFramework – Planning the first implementation

Well, it’s time to create our first implementation, first, we need a place to store the deltas generated in the process of tracking changes in a data object.

To keep the Implementation simple, we will create a delta store that saves the deltas in memory. This delta store can also be used for testing purposes

MemoryDeltaStore

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/MemoryDeltaStore.cs

public class MemoryDeltaStore : BIT.Data.Sync.DeltaStoreBase
   {
       IList<IDelta> Deltas;

       public MemoryDeltaStore(IEnumerable<IDelta> Deltas)
       {
           this.Deltas = new List<IDelta>(Deltas);

       }


       protected MemoryDeltaStore()
       {

       }
       //TODO fix the use of MemoryDb
       public MemoryDeltaStore(DeltaStoreSettings deltaStoreSettings) : base(deltaStoreSettings)
       {

       }

       public async override Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default)
       {
           cancellationToken.ThrowIfCancellationRequested();
           foreach (IDelta delta in deltas)
           {
               cancellationToken.ThrowIfCancellationRequested();
               Deltas.Add(new Delta(delta));
           }
       }

       public override Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default)
       {
           cancellationToken.ThrowIfCancellationRequested();
           var result = Deltas.Where(d => d.Index.CompareTo(startindex) > 0 && string.Compare(d.Identity, identity, StringComparison.Ordinal) != 0);
           return Task.FromResult(result.Cast<IDelta>());
       }
       public override Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default)
       {
           cancellationToken.ThrowIfCancellationRequested();
           return Task.FromResult(Deltas.Where(d => d.Index.CompareTo(startindex) > 0).ToList().Cast<IDelta>());
       }
       Guid LastProcessedDelta;
       public override async Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default)
       {
           return LastProcessedDelta;
       }

       public override async Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
       {
           cancellationToken.ThrowIfCancellationRequested();
           LastProcessedDelta = Index;


       }

      
       Guid LastPushedDelta;
       public async override Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken)
       {
           return LastPushedDelta;
       }

       public async override Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
       {
           cancellationToken.ThrowIfCancellationRequested();
           LastPushedDelta = Index;


       }

       public async override Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken = default)
       {
           cancellationToken.ThrowIfCancellationRequested();
           return Deltas.Count(d => d.Index.CompareTo(startindex) > 0);
       }

       public async override Task PurgeDeltasAsync(CancellationToken cancellationToken = default)
       {
           cancellationToken.ThrowIfCancellationRequested();
           Deltas.Clear();
          

       }
   }

Now that we have a delta store in place, we need a data object, something that we can use to generate data and track how the data is changing, so again for test purposes, I have implemented a small in-memory database

SimpleDatabase

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabase.cs

public class SimpleDatabase 
   {
       public IDeltaProcessor DeltaProcessor { get; set; }
       public string Identity { get; set; }
       public IDeltaStore DeltaStore { get; set; }
       public SimpleDatabase(IDeltaStore deltaStore, string identity,  List<SimpleDatabaseRecord> Data)
       {
         
           Identity = identity;
           DeltaStore = deltaStore;
           this.Data= Data;
       }
       List<SimpleDatabaseRecord> Data;
       public async void Update(SimpleDatabaseRecord Instance)
       {
           var ObjectToUpdate = Data.FirstOrDefault(x => x.Key == Instance.Key);
           if (ObjectToUpdate != null)
           {
               var Index = Data.IndexOf(ObjectToUpdate);
               Data[Index] = Instance;
               SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Update, Instance);
               await SaveDelta(item);
           }
         
       }

       private async Task SaveDelta(SimpleDatabaseModification item)
       {
           var Delta = DeltaStore.CreateDelta(Identity,item);
           await DeltaStore.SaveDeltasAsync(new List<IDelta>() { Delta }, default);
       }

       public void Delete(SimpleDatabaseRecord Instance)
       {
           var ObjectToDelete=  Data.FirstOrDefault(x=>x.Key==Instance.Key);
           if(ObjectToDelete!=null)
           {
               Data.Remove(ObjectToDelete);
              
           }
          
       }
       public async Task Add(SimpleDatabaseRecord Instance)
       {
           Data.Add(Instance);
          
           SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Add, Instance);
           await SaveDelta(item);
       }
   }

In the class above I have implemented methods to add, delete and update a record. Inside each method I create an instance of an object called SimpleDatabaseModification, I used that object to keep track of which operation is happening and keep a copy of the instance being handle at the moment, that is what we are going to save as a delta.

SimpleDatabaseModification

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabaseModification.cs

     public class SimpleDatabaseModification
    {
        public OperationType Operation { get; set; }
        public SimpleDatabaseModification(OperationType operation, SimpleDatabaseRecord record)
        {
            Operation = operation;
            Record = record;
        }
        public SimpleDatabaseRecord Record { get; set; }
    }

 

Now since the SimpleDatabase is saving the records on a list the next step is to create a processor that gets the information out of the delta and use it to recreate that list, so here is the delta processor

SimpleDatabaseDeltaProcessor

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabaseDeltaProcessor.cs

    public class SimpleDatabaseDeltaProcessor :DeltaProcessorBase 
    {
        
        List<SimpleDatabaseRecord> _CurrentText;
        public SimpleDatabaseDeltaProcessor(DeltaStoreSettings deltaStoreSettings, List<SimpleDatabaseRecord> CurrentData) : base(deltaStoreSettings)
        {
            _CurrentText= CurrentData;
        }
        public override Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
        {
           
            cancellationToken.ThrowIfCancellationRequested();
            foreach (IDelta delta in deltas)
            {
                cancellationToken.ThrowIfCancellationRequested();
                var Modification= this.GetDeltaOperations<SimpleDatabaseModification>(delta);
                switch (Modification.Operation)
                {
                    case OperationType.Add:
                        this._CurrentText.Add(Modification.Record);
                        break;
                    case OperationType.Delete:
                        var ObjectToDelete=  this._CurrentText.FirstOrDefault(x=>x.Key==Modification.Record.Key);
                        this._CurrentText.Remove(ObjectToDelete);
                        break;
                    case OperationType.Update:
                        var ObjectToUpdate = this._CurrentText.FirstOrDefault(x => x.Key == Modification.Record.Key);
                        var Index= this._CurrentText.IndexOf(ObjectToUpdate);
                        this._CurrentText[Index] = Modification.Record;
                        break;
                }
              
                
            }
            return Task.CompletedTask;
            
        }
    }

 

Well, that is for this post, in the next post we will create some test scenarios to test our implementations

 

 

Synchronization Framework Base Classes

Synchronization Framework Base Classes

Now that we have defined the bases contracts necessary for synchronization, we can define some base classes that implement those contracts, the main idea behind these base classes is to, later on, add the possibility to inject configurations with .net dependency injection.

Let’s start with the delta implementation

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Delta.cs

/// <summary>
    /// An implementation of the IDelta interface, this class is primary used for serialization and transportation purpose 
    /// </summary>
    public class Delta : IDelta
    {
        public Delta()
        {
        }
        public static Guid GenerateComb()
        {
            return Provider.PostgreSql.Create();
           
        }
        public Delta(string identity, byte[] operation, bool processed = false)
        {

            Identity = identity;
            Operation = operation;
            Processed = processed;
        }
        public Delta(IDelta Delta)
        {

            Identity = Delta.Identity;
            Index = Delta.Index;
            Operation = Delta.Operation;
          

        }
        public Delta(string identity, Guid index, byte[] operation, bool processed = false)
        {

            Identity = identity;
            Index = index;
            Operation = operation;
            Processed = processed;
        }
        public virtual DateTime Date { get; set; }
        public virtual string Identity { get; set; }

        public virtual Guid Index { get; set; }

        public virtual byte[] Operation { get; set; }
        public virtual bool Processed { get; set; }
        public virtual double Epoch { get; set; }

    }

Now the delta store

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/DeltaStoreBase.cs

   public abstract class DeltaStoreBase : IDeltaStore
   {

       protected DeltaStoreBase()
       {

       }
       protected DeltaStoreSettings _deltaStoreSettings;

       public string Identity { get; private set; }

       public DeltaStoreBase(DeltaStoreSettings deltaStoreSettings)
       {
           this._deltaStoreSettings = deltaStoreSettings;
           Setup();
       }
       protected virtual void Setup()
       {

       }
       public abstract Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default);

       public abstract Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default);
       //public abstract Task<IEnumerable<IDelta>> GetDeltasToSendAsync(Guid startindex, CancellationToken cancellationToken = default);
       public abstract Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default);
       public abstract Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default);

       public abstract Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default);

       public void SetIdentity(string Identity)
       {
           this.Identity = Identity;
       }

       public abstract Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken = default);
       public abstract Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default);

       public abstract Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken=default);
       public abstract Task PurgeDeltasAsync(CancellationToken cancellationToken);
   }

and finally, the delta processor

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/DeltaProcessorBase.cs

public abstract class DeltaProcessorBase : IDeltaProcessor
{
    protected DeltaStoreSettings _deltaStoreSettings;
    public DeltaProcessorBase(DeltaStoreSettings deltaStoreSettings)
    {
        _deltaStoreSettings = deltaStoreSettings;
    }

    public abstract Task ProcessDeltasAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken);

}

That’s it for this post, see you on the next post “Planning the first implementation”

 

Let’s write a Synchronization Framework in C#

Let’s write a Synchronization Framework in C#

Ok in the last post we defined the conceptual parts of a synchronization framework, now let’s create the code that represents those parts

Delta

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/IDelta.cs

 

/// <summary>
/// Represents a transaction made to the database 
/// </summary>
public interface IDelta
{
   double Epoch { get; set; }
    /// <summary>
    /// Who created the delta
    /// </summary>
    string Identity { get; set; }
    /// <summary>
    /// The unique identifier of the delta
    /// </summary>
    Guid Index { get; }
    /// <summary>
    /// The database transaction(s) that represents this delta
    /// </summary>
    byte[] Operation { get; set; }  
}

 

Epoch: The date when the operation happened

Identity: Who created the delta

Index: A sortable GUID

Operation: The database transaction(s) that represents this delta

 

Delta Processor

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaProcessor.cs

public interface IDeltaProcessor
{
    /// <summary>
    /// Extracts the content of an IEnumerable of deltas and process it on the current data object
    /// </summary>
    /// <param name="deltas">an IEnumerable of deltas</param>
    /// <param name="cancellationToken">Cancellation token</param>
    /// <returns>An empty task</returns>
    Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

As you can see the delta processor is really simple, it only contains one method that is in charge of getting the content of a group of deltas and process those differences in the current data object

 

Delta Store

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaStore.cs

public interface IDeltaStore
    {
        string Identity { get; }
        void SetIdentity(string Identity);
        /// <summary>
        /// Saves the IEnumerable<IDelta> of deltas in the current store
        /// </summary>
        /// <param name="deltas">The IEnumerable<IDelta> to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
        /// <summary>
        /// Gets an IEnumerable<IDelta> of deltas generated by other nodes with indeces greater than the start index 
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="myIdentity">The identity of the current node </param>
        /// <param name="cancellationToken">a Cancellation token</param>
        /// <returns>An IEnumerable with deltas generated by other nodes</returns>
        Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string myIdentity, CancellationToken cancellationToken);
        /// <summary>
        /// Get all deltas in the store with an index greater than the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">a cancellation token</param>
        /// <returns>An IEnumerable of deltas</returns>
        Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the count of deltas with indeces greater that the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>The count</returns>
        Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the index of the last delta process by this data object
        /// </summary>
        /// <param name="cancellationToken"> cancellation token</param>
        /// <returns>The index of the last delta process by this data object</returns>
        Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta process by this data object
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        ///  Gets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>the index of the last delta pushed to the server node</returns>
        Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        /// Delete all deltas in the store
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task PurgeDeltasAsync(CancellationToken cancellationToken);

    }

SaveDeltasAsync :Saves the IEnumerable<IDelta> of deltas in the current store

GetDeltasFromOtherNodes: Gets an IEnumerable<IDelta> of deltas generated by other nodes with indices greater than the start index

GetDeltasAsync: Get all deltas in the store with an index greater than the start index

GetDeltaCountAsync: Gets the count of deltas with indices greater than the start index

GetLastProcessedDeltaAsync: Gets the index of the last delta process by this data object

SetLastProcessedDeltaAsync: Sets the index of the last delta process by this data object

GetLastPushedDeltaAsync: Gets the index of the last delta pushed to the server node

SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken): Sets the index of the last delta pushed to the server node

PurgeDeltasAsync: Delete all deltas in the store

That’s all for this post in the next post we will define the bases classes that implement the interfaces described above

Parts of a Synchronization Framework

Parts of a Synchronization Framework

In the last post, we talked about what are deltas and how by using them we can synchronize data structures.

So, in this post, I will describe the necessary parts needed to implement Delta-based synchronization, let’s start

  • Data Object: any database, object, graph, or file system that we are tracking for synchronization
  • Delta: a delta is a data difference between the original and the current state of a data object
  • Node: is a point of the synchronization network, there are 2 types of nodes
    1. Client node: a node that is able to produce and process deltas
    2. Server node: a node that is used only to exchange deltas, it can optionally process deltas too.
  • Delta Store: storage where you can save deltas so you can later exchange them with other nodes
  • Delta Processor: a utility that helps you includes the deltas created in other nodes in your current copy of the data object

Now let’s implement this concept to synchronize a database

Ok so each database that we want to synchronize will be a client node and a client node should be able to produce and store deltas and to process deltas produced by other nodes, so our database node should look like the following diagram

The server node can be as simple as just delta storage exposed in an HTTP API as you can see in the basic server node diagram or use several delta storages and delta processors as show on the complex server node diagram

                                         

 

And with those diagrams, I finish this post, in the next post we will implement those concepts in C# code

Data synchronization in a few words

Data synchronization in a few words

To Synchronize data is one of the most challenging tasks out there, especially if you are working with LOB applications

There are many ways to synchronize data, the most common technique is to compare records by modification date and then merge the data to create a master record.

Here the main problem is that you have to have a way to compare each type of record, so it gets cumbersome as soon as the number of types in your application begins to grow.

Also, you have to have a log of what gets created and what gets deleted so you can do the same changes on the other nodes

Delta-based synchronization

delta-based synchronization is a problem of identifying “what changed” between each execution of a sync process

A directed delta also called a change, is a sequence of (elementary) change operations which, when applied to one version V1, yields another version V2 (note the correspondence to transaction logs in databases). In computer implementations

In delta synchronization, there are 2 main tasks

  • Record the deltas (or small difference of data between nodes)
  • Transmit these differences to the other nodes so they can process them

Implementing Delta-based synchronization for relational databases

The schema above represents a blog’s database, it’s a really simple schema so its easy to understand, now this is the scenario

We have the main database that we will call “Master” and 2 other databases named client A and client B.

Now let insert data in the master

Each DML statement should be converted in a delta (or a data difference)

Δ1

Δ2

Δ3

Copy deltas Δ 1, Δ 2, Δ 3 to the clients

So, after processing the deltas on each client, the data in all databases should look like the picture below

 

 

So that’s it for this post, in the next post we will be examing each part that is necessary to do delta-based synchronization