phone

    • chevron_right

      JMP: Verify Google Play App Purchase on Your Server

      news.movim.eu / PlanetJabber • 11 April, 2023 • 5 minutes

    We are preparing for the first-ever Google Play Store launch of Cheogram Android as part of JMP coming out of beta later this year.  One of the things we wanted to “just work” for Google Play users is to be able to pay for the app and get their first month of JMP “bundled” into that purchase price, to smooth the common onboarding experience.  So how do the JMP servers know that the app communicating with them is running a version of the app bought from Google Play as opposed to our builds, F-Droid’s builds, or someone’s own builds?  And also ensure that this person hasn’t already got a bundled month before?  The documentation available on how to do this is surprisingly sparse, so let’s do this together.

    Client Side

    Google publishes an official Licensing Verification Library for communicating with Google Play from inside an Android app to determine if this install of the app can be associated with a Google Play purchase.  Most existing documentation focuses on using this library, however it does not expose anything in the callbacks other than “yes license verified” or “no, not verified”.  This can allow an app to check if it is a purchased copy itself, but is not so useful for communicating that proof onward to a server.  The library also contains some exciting snippets like:

    // Base64 encoded -
    // com.android.vending.licensing.ILicensingService
    // Consider encoding this in another way in your
    // code to imp rove security
    Base64.decode(
        "Y29tLmFuZHJvaWQudmVuZGluZy5saWNlbnNpbmcuSUxpY2Vuc2luZ1NlcnZpY2U=")))

    Which implies that they expect developers to fork this code to use it.  Digging in to the code we find in LicenseValidator.java:

    public void verify(PublicKey publicKey, int responseCode, String signedData, String signature)

    Which looks like exactly what we need: the actual signed assertion from Google Play and the signature!  So we just need a small patch to pass those along to the callback as well as the response code currently being passed.  Then we can use the excellent jitpack to include the forked library in our app:

    implementation 'com.github.singpolyma:play-licensing:1c637ea03c'

    Then we write a small class in our app code to actually use it:

    import android.content.Context;
    import com.google.android.vending.licensing.*;
    import java.util.function.BiConsumer;
    
    public class CheogramLicenseChecker implements LicenseCheckerCallback {
        private final LicenseChecker mChecker;
        private final BiConsumer mCallback;
    
        public CheogramLicenseChecker(Context context, BiConsumer<String, String> callback) {
            mChecker = new LicenseChecker(  
                context,  
                new StrictPolicy(), // Want to get a signed item every time  
                context.getResources().getString(R.string.licensePublicKey)  
            );
            mCallback = callback;
        }
    
        public void checkLicense() {
            mChecker.checkAccess(this);
        }
    
        @Override
        public void dontAllow(int reason) {
            mCallback.accept(null, null);
        }
    
        @Override
        public void applicationError(int errorCode) {
            mCallback.accept(null, null);
        }
    
        @Override
        public void allow(int reason, ResponseData data, String signedData, String signature) {
            mCallback.accept(signedData, signature);
        }
    }

    Here we use the StrictPolicy from the License Verification Library because we want to get a fresh signed data every time, and if the device is offline the whole question is moot because we won’t be able to contact the server anyway.

    This code assumes you put the Base64 encoded licensing public key from “Monetisation Setup” in Play Console into a resource R.string.licensePublicKey .

    Then we need to communicate this to the server, which you can do whatever way makes sense for your protocol; with XMPP we can easily add custom elements to our existing requests so:

    new com.cheogram.android.CheogramLicenseChecker(context, (signedData, signature) -> {
        if (signedData != null && signature != null) {
            c.addChild("license", "https://ns.cheogram.com/google-play").setContent(signedData);
            c.addChild("licenseSignature", "https://ns.cheogram.com/google-play").setContent(signature);
        }
    
        xmppConnectionService.sendIqPacket(getAccount(), packet, (a, iq) -> {
            session.updateWithResponse(iq);
        });
    }).checkLicense();

    Server Side

    When trying to verify this on the server side we quickly run into some new issues.  What format is this public key in?  It just says “public key” and is Base64 but that’s about it.  What signature algorithm is used for the signed data?  What is the format of the data itself?  Back to the library code!

    private static final String KEY_FACTORY_ALGORITHM = "RSA";
    …
    byte[] decodedKey = Base64.decode(encodedPublicKey);
    …
    new X509EncodedKeySpec(decodedKey)

    So we can see it is an X509 related encoded, and indeed turns out to be Base64 encoded DER.  So we can run this:

    echo "BASE64_STRING" | base64 -d | openssl rsa -pubin -inform der -in - -text

    to get the raw properties we might need for any library (key size, modulus, and exponent).  Of course, if your library supports parsing DER directly you can also use that.

    import java.security.Signature;
    …
    private static final String SIGNATURE_ALGORITHM = "SHA1withRSA";
    …
    Signature sig = Signature.getInstance(SIGNATURE_ALGORITHM);
    sig.initVerify(publicKey);
    sig.update(signedData.getBytes());

    Combined with the java documentation we can thus say that the signature algoritm is PKCS#1 padded RSA with SHA1.

    And finally:

    String[] fields = TextUtils.split(mainData, Pattern.quote("|"));
    data.responseCode = Integer.parseInt(fields[0]);
    data.nonce = Integer.parseInt(fields[1]);
    data.packageName = fields[2];
    data.versionCode = fields[3];
    // Application-specific user identifier.
    data.userId = fields[4];
    data.timestamp = Long.parseLong(fields[5]);

    The format of the data, pipe-seperated text. The main field of interest for us is userId which is (as it says in a comment) “a user identifier unique to the <application, user> pair”. So in our server code:

    import Control.Error (atZ)
    import qualified Data.ByteString.Base64 as Base64
    import qualified Data.Text as T
    import Crypto.Hash.Algorithms (SHA1(SHA1))
    import qualified Crypto.PubKey.RSA as RSA
    import qualified Crypto.PubKey.RSA.PKCS15 as RSA
    import qualified Data.XML.Types as XML
    
    googlePlayUserId
        | googlePlayVerified = (T.split (=='|') googlePlayLicense) `atZ` 4
        | otherwise = Nothing
    googlePlayVerified = fromMaybe False $ fmap (\pubKey ->
        RSA.verify (Just SHA1) pubKey (encodeUtf8 googlePlayLicense)
            (Base64.decodeLenient $ encodeUtf8 googlePlaySig)
        ) googlePlayPublicKey
    googlePlayLicense = mconcat $ XML.elementText
        =<< XML.isNamed (s"{https://ns.cheogram.com/google-play}license")
        =<< XML.elementChildren payload
    googlePlaySig = mconcat $ XML.elementText
        =<< XML.isNamed (s"{https://ns.cheogram.com/google-play}licenseSignature")
        =<< XML.elementChildren payload

    We can then use the verified and extracted googlePlayUserId value to check if this user has got a bundled month before and, if not, to provide them with one during signup.

    • wifi_tethering open_in_new

      This post is public

      blog.jmp.chat /b/play-purchase-verification-2023

    • chevron_right

      Erlang Solutions: You’ve been curious about LiveView, but you haven’t gotten into it

      news.movim.eu / PlanetJabber • 6 April, 2023 • 21 minutes

    As a backend developer, I’ve spent most of my programming career away from frontend development. Whether it’s React/Elm for the web or Swift/Kotlin for mobile, these are fields of knowledge that fall outside of what I usually work with.

    Nonetheless, I always wanted to have a tool at my disposal for building rich frontends. While the web seemed like the platform with the lowest bar of entry for this, the size of the Javascript ecosystem had become so vast that familiarizing oneself with it was no small task.

    This is why I got very excited when Chris McCord first showed LiveView to the world. Building interactive frontends, with no Javascript required? This sounded like it was made for all of us Elixir backend developers that were “frontend curious”.

    However, if you haven’t already jumped into it, you might be hesitant to start. After all: it’s often not just about learning LiveView as if you were writing a greenfield project, but about how you would add LiveView into that Phoenix app that you’re already working on.

    Therefore, throughout this guide, I’ll presume that you already have an existing project that you wish to integrate LiveView into. If you have the luxury of a clean slate, then other resources (such as the Programming Phoenix LiveView book, by Bruce A. Tate and Sophie DeBenedetto ) may be of more use.

    I hope that this article may serve you well as a starting point!

    Will it work for my use case?

    You might have some worries about whether LiveView is a technology that you can introduce to your application. After all: no team likes to adopt a technology that they later figure out does not suit their use case.

    There are some properties of LiveView which are inherent to the technology, and therefore must be considered:

    Offline mode

    The biggest question is whether you need an offline mode for your application. My guess is that you probably do not need it , but if you do, LiveView is not the technology for you. The reason for this is that LiveView is rendered on the backend , necessitating communication with it.

    Latency

    The second biggest question: do you expect the latency from your clients to the server to be high , and would it being high be a serious detriment to your application?

    As Chris McCord put it in his announcement blog post on the Dockyard blog :

    “Certain use cases and experiences demand zero-latency, as well as offline capabilities. This is where Javascript frameworks like React, Ember, etc., shine.”

    Almost every interaction with a LiveView interface will send a request to the server; while requests will have highly optimized payloads, if you expect the average round trip from client to server to be too many milliseconds, then the user experience will suffer. LiveView ships with tools for testing your application with increased latency, but if you already know that there’s a certain latency maximum that your clients must not but very likely would exceed, then LiveView may not be suitable.

    If these are not of concern to your use case, then let’s get going!

    What does it take for me to start?

    Phoenix setup

    First of all, you’ll want to have a recent version of Phoenix, and your code up-to-date. Following are upgrade guides for older projects:

    LiveView setup

    The next step is to install LiveView into your existing project. The LiveView documentation has a great section on the subject: Installing LiveView into an existing project .

    The guide is rather straight-forward, so I will not reiterate its contents here. The only comment I’ll add is that the section at the very end about adding a topbar is (as the documentation points out) optional. It should be said, however, that this is added by default in new LiveView projects, so if you want to have a setup that’s as close to a freshly generated project, you should include this.

    At this point, you should have everything ready for introducing your own LiveView code!

    Quick LiveView overview

    Before we get to the actual coding, let’s get at a quick overview of the life cycle of a LiveView page. Here’s a high-level overview:

    The first request made to a LiveView route will be a plain HTTP request. The router will invoke a LiveView module, which calls the mount/3 function and then the render/1 function. This will render a static page (SEO-friendly out-of-the-box, by the way!), with the required Javascript for LiveView to work. The page then opens a WebSocket connection between the client and the server.

    After the WebSocket connection has been established, we get into the LiveView life cycle:

    Note that mount/3 and render/1 will be called again, this time over the WebSocket connection. While this probably will not be something you need to worry about when writing your first LiveView pages, it might be of relevance to know that this is the case ( discussion about this can be read here ). If you have a very expensive function call to make, and you only want to do it once, consider using the connected?/1 function.

    After render/1 has been called a second time, we get into the LiveView loop: wait for events, send the events over the wire, change the state on the server, then send back the minimal required data for updating the page on the client.

    Let’s now see how we’ll need to change your code to get to this LiveView flow.

    Making things live

    Now you might be asking:

    “OK, so the basics have been set up. What are the bare minimum things to get a page to be live?”

    You’ll need to do the following things:

    1. Convert an existing route to a live one
    2. Convert the controller module into a live module
    3. Modify the templates
    4. Introduce liveness

    Let’s go over them, one by one:

    Bringing life to the dead

    Here’s a question I once had, that you might be wondering:

    If I’ve got a regular (“dead”) Phoenix route, can I just add something live to a portion of the page, on the existing “dead” route?

    Considering how LiveView works, I’d like to transform the question into two new (slightly different) questions:

    1. Can one preserve the current routes and controllers, having them execute live code?
    2. Can one express the live interactions in the dead controllers?

    The answer to the first question: yes, but generally you won’t . You won’t, because of the answer to the second question: no , you’ll need separate live modules to express the live interactions.

    This leads to an important point:

    If you want some part of a page to be live, then your whole page has to be live.

    Technically , you can have the route be something else than live (e.g. a get route), and you would then use Phoenix.LiveView.Controller.live_render/3 in a “dead” controller function to render a LiveView module. This does still mean, however, that the page (the logic and templates) will be defined by the live module. You’re not “adding something live to a portion of the dead page”, but rather delegating to a live module from a dead route; you’ll still have to migrate the logic and templates to the live module.

    Therefore, your live code will be in LiveView modules (instead of your current controller modules ), invoked by live routes. As a sidenote: while it’s not covered by this article, you’ll eventually group live routes with live_session/3 , enabling redirects between routes without full page reloads.

    Introducing a live route

    Many tutorials and videos about LiveView use the example of programming a continuously updating rendering of a thermostat. Let’s therefore presume that you’ve got a home automation application, and up until now you had to go to /thermostats and refresh the page to get the latest data.

    The router.ex might look something like this:

    defmodule HomeAutomationWeb.Router do
      use HomeAutomationWeb, :router
    
      pipeline :browser do
        # ...
      end
    
      pipeline :logged_in do
        # ...
      end
    
      scope "/", HomeAutomationWeb do
        pipe_through [:browser, :logged_in]
    
        # ...
    
        resources "/thermostats", ThermostatController
        post "/thermostats/reboot", ThermostatController, :reboot
      end
    end
    

    This is a rather simple router (with some lines removed for brevity), but you can probably figure out how this compares to your code. We’re using a call to Phoenix.Router.resources/2 here to cover a standard set of CRUD actions; your set of actions could be different.

    Let’s introduce the following route after the post-route:

    live "/live/thermostats", ThermostatLive
    

    The ThermostatLive will be the module to which we’ll be migrating logic from ThermostatController.

    Creating a live module to migrate to

    Creating a skeleton

    Let’s start by creating a directory for LiveView modules, then create an empty thermostat_live.ex in that directory.

    $ mkdir lib/home_automation_web/live
    $ touch lib/home_automation_web/live/thermostat_live.ex
    

    It might seem a bit strange to create a dedicated directory for the live modules, considering that the dead parts of your application already have controller/template/view directories. This convention, however, allows one to make use of the following feature from the Phoenix.LiveView.render/1 callback (slight changes by me, for readability):

    If you don’t define [render/1 in your LiveView module], LiveView will attempt to render a template in the same directory as your LiveView. For example, if you have a LiveView named MyApp.MyCustomView inside lib/my_app/live_views/my_custom_view.ex, Phoenix will look for a template at lib/my_app/live_views/my_custom_view.html.heex.

    This means that it’s common for LiveView projects to have a live directory with file pairs, such as foobar.ex and foobar.html.heex, i.e. module and corresponding template. Whether you inline your template in the render/1 function or put it in a dedicated file is up to you.

    Open the lib/home_automation_web/live/thermostat_live.ex file, and add the following skeleton of the ThermostatLive module:

    defmodule HomeAutomationWeb.ThermostatLive do
      use HomeAutomationWeb, :live_view
    
      def mount(_params, _session, socket) do
        {:ok, socket}
      end
    
      def render(assigns) do
        ~H"""
        <div id="thermostats">
          <p>Thermostats</p>
        </div>
        """
      end
    end
    

    There are two mandatory callbacks in a LiveView module: mount/3, and render/1. As mentioned earlier, you can leave out render/1 if you have a template file with the right file name. You can also leave out the mount/3, but that would mean that you neither want to set any state, nor do any work on mount, which is unlikely.

    Migrating mount logic

    Let’s now look at our imagined HomeAutomationWeb.ThermostatController, to see what we’ll be transferring over to ThermostatLive:

    defmodule HomeAutomationWeb.ThermostatController do
      use HomeAutomationWeb, :controller
    
      alias HomeAutomation.Thermostat
    
      def index(conn, _params) do
        thermostats = Thermostat.all_for_user(conn.assigns.current_user)
    
        render(conn, :index, thermostats: thermostats)
      end
    
      # ...
    
      def reboot(conn, %{"id" => id}) do
        {:ok, thermostat} =
          id
          |> Thermostat.get!()
          |> Thermostat.reboot()
    
        conn
        |> put_flash(:info, "Thermostat '#{thermostat.room_name}' rebooted.")
        |> redirect(to: Routes.thermostat_path(conn, :index))
      end
    end
    

    We’ll be porting a subset of the functions that are present in the controller module: index/2 and reboot/2. This is mostly to have two somewhat different controller actions to work with.

    Let’s first focus on the index/2 function. We could imagine that Thermostat.all_for_user/1 makes a database call of some kind, possibly with Ecto. conn.assigns.current_user would be added to the assigns by the logged_in Plug in the pipeline in the router.

    Let’s naively move over the ThermostatController.index/2 logic to the LiveView module, and take it from there:

    defmodule HomeAutomationWeb.ThermostatLive do
      use HomeAutomationWeb, :live_view
    
      alias HomeAutomation.Thermostat
    
      def mount(_params, _session, socket) do
        thermostats = Thermostat.all_for_user(socket.assigns.current_user)
    
        {:ok, assign(socket, %{thermostats: thermostats})}
      end
    
      def render(assigns) do
        ~H"""
        <div id="thermostats">
          <p>Thermostats</p>
        </div>
        """
      end
    end
    

    Firstly, we’re inserting the index/2 logic into the mount/3 function of ThermostatLive, meaning that the data will be called for on page load.

    Secondly, notice that we changed the argument to Thermostat.all_for_user/1 from conn.assigns.current_user to socket.assigns.current_user. This is just a change of variable name, of course, but it signifies a change in the underlying data structure: you’re not working with a Plug.Conn struct, but rather with a Phoenix.LiveView.Socket.

    So far we’ve written some sample template code inside the render/1 function definition, and we haven’t seen the actual templates that would render the thermostats, so let’s get to those.

    Creating live templates

    Let’s presume that you have a rather simple index page, listing all of your thermostats.

    <h1>Listing Thermostats</h1>
    
    <%= for thermostat <- @thermostats do %>
      <div class="thermostat">
        <div class="row">
          <div class="column">
            <ul>
              <li>Room name: <%= thermostat.room_name %></li>
              <li>Temperature: <%= thermostat.temperature %></li>
            </ul>
          </div>
    
          <div class="column">
            Actions: <%= link("Show", to: Routes.thermostat_path(@conn, :show, thermostat)) %>
            <%= link("Edit", to: Routes.thermostat_path(@conn, :edit, thermostat)) %>
            <%= link("Delete",
              to: Routes.thermostat_path(@conn, :delete, thermostat),
              method: :delete,
              data: [confirm: "Are you sure?"]
            ) %>
          </div>
    
          <div class="column">
            <%= form_for %{}, Routes.thermostat_path(@conn, :reboot), fn f -> %>
              <%= hidden_input(f, :id, value: thermostat.id) %>
              <%= submit("Reboot", class: "rounded-full") %>
            <% end %>
          </div>
        </div>
      </div>
    <% end %>
    
    <%= link("New Thermostat", to: Routes.thermostat_path(@conn, :new)) %>
    

    Each listed thermostat has the standard resource links of Show/Edit/Delete, with a New-link at the very end of the page. The only thing that goes beyond the usual CRUD actions is the form_for, defining a Reboot-button. The Reboot-button will initiate a request to the POST /thermostats/reboot route.

    As previously mentioned, we can either move this template code into the ThermostatLive.render/1 function, or we can create a template file named lib/home_automation_web/live/thermostat_live.html.heex. To get used to the new ways of LiveView, let’s put the code into the render/1 function. You can always extract it later (but remember to delete the render/1 function, if you do!).

    The first step would be to simply copy paste everything, with the small change that you need to replace every instance of @conn with @socket. Here’s what the ThermostatLive will look like:

    defmodule HomeAutomationWeb.ThermostatLive do
      use HomeAutomationWeb, :live_view
    
      alias HomeAutomation.Thermostat
    
      def mount(_params, _session, socket) do
        thermostats = Thermostat.all_for_user(socket.assigns.current_user)
    
        {:ok, assign(socket, %{thermostats: thermostats})}
      end
    
      def render(assigns) do
        ~H"""
        <h1>Listing Thermostats</h1>
    
        <%= for thermostat <- @thermostats do %>
          <div class="thermostat">
            <div class="row">
              <div class="column">
                <ul>
                  <li>Room name: <%= thermostat.room_name %></li>
                  <li>Temperature: <%= thermostat.temperature %></li>
                </ul>
              </div>
    
              <div class="column">
                Actions: <%= link("Show", to: Routes.thermostat_path(@socket, :show, thermostat)) %>
                <%= link("Edit", to: Routes.thermostat_path(@socket, :edit, thermostat)) %>
                <%= link("Delete",
                  to: Routes.thermostat_path(@socket, :delete, thermostat),
                  method: :delete,
                  data: [confirm: "Are you sure?"]
                ) %>
              </div>
    
              <div class="column">
                <%= form_for %{}, Routes.thermostat_path(@socket, :reboot), fn f -> %>
                  <%= hidden_input(f, :id, value: thermostat.id) %>
                  <%= submit("Reboot", class: "rounded-full") %>
                <% end %>
              </div>
            </div>
          </div>
        <% end %>
    
        <%= link("New Thermostat", to: Routes.thermostat_path(@socket, :new)) %>
        """
      end
    end
    

    While this makes the page render, both the links and the form are doing the same “dead” navigation as before, leading to full-page reloads, not to mention that we currently get out from the live page.

    To make the page more live, let’s focus on making the clicking of the Reboot-button result in a LiveView event, instead of a regular POST with subsequent redirect.

    Changing the button to something live

    The Reboot-button is a good target to turn live, as it should just fire an asynchronous event, without redirecting anywhere. Let’s have a look at how the button is currently defined:

    <%= form_for %{}, Routes.thermostat_path(@socket, :reboot), fn f -> %>
      <%= hidden_input(f, :id, value: thermostat.id) %>
      <%= submit("Reboot", class: "rounded-full") %>
    <% end %>
    

    The reason why the “dead” template used a form_for with a submit is two-fold. Firstly , since the action of rebooting the thermostat is not a navigation action, using an anchor tag (<a>) styled to look like a button would not be appropriate: using a form with a submit button is better, since it indicates that an action will be performed, and the action is clearly defined by the form’s method and action attributes. Secondly , a form allows you to include a CSRF token , which is automatically injected into the resulting <form> with form_for.

    Let’s look at what the live version will look like:

    <%= link("Reboot",
      to: "#",
      phx_click: "reboot",
      phx_value_id: thermostat.id,
      data: [confirm: "Are you sure?"]
    ) %>
    

    Let’s break this down a bit:

    A note about <form>

    First thing to note: this is no longer a <form>!

    Above I mentioned CSRF protection being a reason for using the <form>, but the Channel (i.e. the WebSocket connection between server and client) is already protected with a CSRF token, so we can send LiveView events without worrying about this.

    The detail above about navigation technically still applies, but in LiveView one would (generally) use a link with to: “#” for most things functioning like a button.

    As a minor note: you’ll still be using forms in LiveView for data input, although you’ll be using the <.form> component , instead of calling form_for .

    The phx_click event

    The second thing to note is that is the phx_click attribute, and it’s value “reboot”. The key is indicating what event should be fired when interacting with the generated <a> tag. The various possible event bindings can be found here:

    https://hexdocs.pm/phoenix_live_view/bindings.html

    If you want to have a reference for what events you can work with in LiveView, the link above is a good one to bookmark!

    Clarifying a potentially confusing detail: the events listed in the above linked documentation use hyphens (-) as separators in their names. link uses underscores (_), but apart from this, the event names are the same.

    The “reboot” string specifies the “name” of the event that is sent to the server. We’ll see the usage of this string in a second.

    The value attribute

    Finally, let’s talk about the phx_value_id attribute. phx_value_id is special, in that part of the attribute name is user defined. The phx_value_-part of the attribute name indicates to LiveView that the attribute is an “event value”, and what follows after phx_value_ (in our case: id) will be the key name in the resulting “event data map” on the server side. The value of the attribute will become the value in the map.

    This means that this…:

    phx_value_id: "thermostat_13" ,

    …will be received as the following on the server:

    %{id: "thermostat_13"}

    Further explanation can be found in the documentation:

    https://hexdocs.pm/phoenix_live_view/bindings.html#click-events

    Adding the corresponding event to the LiveView module

    Now that we’ve changed the Reboot-button in the template, we can get to the final step: amending the ThermostatLive module to react to the “reboot” event. We need to add a handle_event function to the module, and we’ll use the logic that we saw earlier in ThermostatController.reboot/2:

    defmodule HomeAutomationWeb.ThermostatLive do
      use HomeAutomationWeb, :live_view
    
      alias HomeAutomation.Thermostat
    
      def mount(_params, _session, socket) do
        # ...
      end
    
      def handle_event("reboot", %{"id" => id}, socket) do
        {:ok, thermostat} =
          id
          |> Thermostat.get!()
          |> Thermostat.reboot()
    
        {:noreply,
          put_flash(
            socket,
            :info,
            "Thermostat '#{thermostat.room_name}' rebooted."
          )}
      end
    
      def render(assigns) do
        # ...
      end
    end
    

    This handle_event function will react to the “reboot” event. The first argument to the function is the event name, the second is any passed data (through phx-value-*), and finally the socket.

    A quick note about the :noreply: presume that you’ll be using {:noreply, socket}, as the alternative ({:reply, map, socket}) is rarely useful. Just don’t worry about this, for now.

    That’s it!

    If you’ve been following this guide, trying to adapt it to your application, then you should have something like the following:

    1. A live route.
    2. A live module, where you’ve ported some of the logic from the controller module.
    3. A template that’s been adapted to be rendered by a live module.
    4. An element on the page that, when interacted with, causes an event to fire, with no need for a page refresh.

    At this stage, one would probably want to address the other CRUD actions, at the very least having their navigation point to the live route, e.g. creating a new thermostat should not result in a redirect to the dead route. Even better would be to have the CRUD actions all be changed to be fully live, requiring no page reloads. However, this is unfortunately outside of the scope of this guide.

    I hope that this guide has helped you to take your first steps toward working with LiveView!

    Further reading

    Here’s some closing advice that you might find useful, if you want to continue on your own.

    Exploring generators

    A very educative thing to do is comparing what code Phoenix generates for “dead” pages vs. live pages.

    Following are the commands for first generating a “dead” CRUD page setup for a context (Devices) and entity (Thermostat), and then one generates the same context and entity, but in a live fashion. The resulting git commits illustrate how the same intent is expressed in the two styles.

    $ mix phx.new home_automation --live
    $ cd home_automation
    $ git init .
    $ git add .
    $ git commit -m "Initial commit"
    $ mix phx.gen.html Devices Thermostat thermostats room_name:string temperature:integer
    $ git add .
    $ git commit -m "Added Devices context with Thermostat entity"
    $ git show
    $ mix phx.gen.live Devices Thermostat thermostats room_name:string temperature:integer
    $ git add .
    $ git commit -m "Added Live version of Devices with Thermostat"
    $ git show
    

    Note that when you get to the phx.gen.live step, you’ll have to answer Y to a couple of questions, as you’ll be overwriting some code. Also, you’ll generate a superfluous Ecto migration, which you can ignore.

    Study these generated commits, the resulting files, and the difference between the generated approaches, as it helps a lot with understanding how the transition from dead to live is done.

    Broadcasting events

    You might want your live module to react to specific events in your application. In the case of the thermostat application it could be the change of temperature on any of the thermostats, or the reboot status getting updated asynchronously. In the case of a LiveView chat application, it would be receiving a new message from someone in the conversation.

    A very commonly used method for generating and listening to events is making use of Phoenix.PubSub . Not only is Phoenix.PubSub a robust solution for broadcasting events, it gets pulled in as a dependency to Phoenix, so you should already have the hex installed.

    There are numerous guides out there for how to make use of Phoenix.PubSub, but a good place to start is probably watching how Chris McCord uses LiveView and Phoenix.PubSub to create a Twitter clone, in about 15 minutes (the part with Phoenix.PubSub is about half-way through the video).

    HTTP verbs

    Regarding HTTP verbs, coming from the world of dead routes, you might be wondering:

    I’ve got various GET/POST/PUT/etc. routes that serve different purposes. When building live modules, do all of the routes (with their different HTTP verbs) just get replaced with live?

    Yes, mostly. Generally your live parts of the application will handle their communication over the WebSocket connection, sending various events. This means that any kind of meaning you wish to communicate through the various HTTP verbs will instead be communicated through various events instead.

    With that said, you may still have parts of your application that will still be accessed with regular HTTP requests, which would be a reason to keep these routes around. The will not, however, be called from your live components.

    Credits

    Last year, Stone Filipczak wrote an excellent guide on the SmartLogic blog , on how to quickly introduce LiveView to an existing phoenix app. It was difficult to not have overlap with that guide, so my intention has been to complement it. Either way, I encourage you to check it out!

    The post You’ve been curious about LiveView, but you haven’t gotten into it appeared first on Erlang Solutions .

    • wifi_tethering open_in_new

      This post is public

      www.erlang-solutions.com /blog/youve-been-curious-about-liveview-but-you-havent-gotten-into-it/

    • chevron_right

      Erlang Solutions: Captura de datos con Postgres y Elixir

      news.movim.eu / PlanetJabber • 5 April, 2023 • 20 minutes

    La captura de datos es el proceso de identificar y capturar cambios de datos en la base de datos.

    Con captura de datos, los cambios en los datos pueden ser rastreados casi en tiempo real, y esa información puede ser utilizada para apoyar una variedad de casos de uso, incluyendo auditoría, replicación y sincronización.

    Un buen ejemplo de un caso de uso para captura de datos es considerar una aplicación que inserta un registro en la base de datos y envía un evento a una cola de mensajes después de que se ha insertado el registro (escribir dos veces).

    Imagina que estás trabajando en una aplicación de comercio electrónico y después de que se crea y se inserta un pedido en la base de datos, se envía un evento OrderCreated a una cola de mensajes. Los consumidores del evento podrían hacer cosas como crear órdenes de recolección para el almacén, programar transportes para la entrega y enviar un correo electrónico de confirmación del pedido al cliente.

    Pero ¿qué sucede si la aplicación se bloquea después de que se ha insertado el pedido en la base de datos pero antes de lograr enviar el evento a la cola de mensajes? Esto es posible debido al hecho de que no se puede insertar atómicamente el registro Y enviar el mensaje en la misma transacción, por lo que si la aplicación se bloquea después de insertar el registro en la base de datos pero antes de enviar el evento a la cola, se pierde el evento.

    Por supuesto, existen soluciones alternativas para evitar esto: una solución simple es “almacenar” el evento en una tabla de almacenamiento temporal en la misma transacción en la que se escribe el registro, y luego depender de un proceso captura de datos para capturar el cambio en la tabla de almacenamiento y enviar el evento a la cola de mensajes. La transacción es atómica y el proceso de captura de datos puede asegurar que el evento se entregue al menos una vez.

    Para capturar cambios, la captura de datos típicamente utiliza uno de dos métodos: basado en registro o basado en disparadores.

    La captura de datos basado en registro implica leer los registros de transacciones de la base de datos para identificar los cambios de datos, que es el método que utilizaremos aquí al utilizar la replicación lógica de Postgres.

    Replicación de Postgres

    Hay dos modos de replicación en Postgres:

    1. Replicación física: cada cambio del primario se transmite a las réplicas a través del WAL (Write Ahead Log). Esta replicación se realiza byte por byte con direcciones de bloque exactas.
    1. Replicación lógica: en la replicación lógica, el suscriptor recibe cada cambio de transacción individual (es decir, declaraciones INSERT, UPDATE o DELETE) en la base de datos.

    El WAL todavía se transmite, pero codifica las operaciones lógicas para que puedan ser decodificadas por el suscriptor sin tener que conocer los detalles internos de Postgres.

    Una de las grandes ventajas de la replicación lógica es que se puede utilizar para replicar sólo tablas o filas específicas, lo que significa que se tiene un control completo sobre lo que se está replicando.

    Para habilitar la replicación lógica, el wal_level debe ser configurado:

    -- determines how much information is written to the wal. 
    -- Each 'level' inherits the level below it; 'logical' is the highest level
    
    ALTER SYSTEM SET wal_level=logical;
    
    -- simultaneously running WAL sender processes
    ALTER SYSTEM SET max_wal_senders='10';
    
    -- simultaneously defined replication slots
    ALTER SYSTEM SET max_replication_slots='10';
    

    Los cambios requieren un reinicio de la instancia de Postgres.

    Después de reiniciar el sistema, el wal_level se puede verificar con:

    SHOW wal_level;
     wal_level 
    -----------
     logical
    (1 row)
    

    Para suscribirse a los cambios se debe crear una publicación . Una publicación es un grupo de tablas en las que nos gustaría recibir cambios de datos.

    Vamos a crear una tabla simple y definir una publicación para ella:

    CREATE TABLE articles (id serial PRIMARY KEY, title text, description text, body text);
    CREATE PUBLICATION articles_pub FOR TABLE articles;
    

    Para indicar a Postgres que retenga segmentos de WAL, debemos crear un slot de replicación.

    El slot de replicación representa un flujo de cambios desde una o más publicaciones y se utiliza para prevenir la pérdida de datos en caso de una falla del servidor, ya que son a prueba de fallos.

    Protocolo de Replicación

    Para tener una idea del protocolo y los mensajes que se envían, podemos usar pg_recvlogical para iniciar un suscriptor de replicación:

    # Start and use the publication defined above
    # output is written to stdout
    pg_recvlogical --start \
      --host='localhost' \
      --port='5432' \
      --username='postgres' \
      --dbname='postgres' \
      --option=publication_names='articles_pub' \
      --option=proto_version=1 \
      --create-slot \
      --if-not-exists \
      --slot=articles_slot \
      --plugin=pgoutput \
      --file=-
    

    Insertar un registro:

    INSERT INTO articles (title, description, body)
        VALUES ('Postgres replication', 'Using logical replication', 'Foo bar baz');
    

    Cada linea en la salida corresponde a un mensaje de replicación recibido a través de suscripción:

    B(egin) - Begin transaction 
    R(elation) - Table, schema, columns and their types
    I(insert) - Data being inserted
    C(ommit) - Commit transaction
    
    ___________________________________
    
    B
    
    Rarticlesdidtitledescriptionbody
    It35tPostgres replicationtUsing logical replicationtFoo bar baz
    C
    

    Si insertamos múltiples registros en una transacción deberíamos tener dos I entre B y C:

    BEGIN;
    INSERT INTO articles (title, description, body) VALUES ('First', 'desc', 'Foo');
    
    INSERT INTO articles (title, description, body) VALUES ('Second', 'desc', 'Bar');
    COMMIT;
    

    Y la salida:

    C
    B
    
    It37tFirsttdesctFoo
    It38tSecondtdesctBar
    CCopied to clipboard!
    

    La información de la relación, es decir, la tabla, no se transmitió porque ya se recibió la relación al insertar el primer registro.

    Postgres solo envía la relación la primera vez que se encuentra durante la sesión. Se espera que el suscriptor almacene en caché una relación previamente enviada.

    Ahora que tenemos una idea de cómo funciona la replicación lógica, ¡implementémosla en Elixir!

    Implementando la conexión de replicación

    Cree un nuevo proyecto de Elixir:

    mix new cdc
    

    Añadiremos las siguientes dependencias a mix.exs:

    defp deps do
      {:postgrex, "~> 0.16.4"},
      # decode/encode replication messages
      {:postgrex_pgoutput, "~> 0.1.0"}
    end
    

    Postgrex admite la replicación a través del proceso Postgrex.ReplicationConnection.

    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
      require Logger
    
      defstruct [
        :publications,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok, %__MODULE__{slot: slot, publications: pubs}}
      end
    
      
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result([%Postgrex.Result{} | _], %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        Logger.debug("Received msg=#{inspect(msg, limit: :infinity, pretty: true)}")
        {:noreply, [], state}
      end
    
      defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    

    El código esta disponible en GitHub

    Probemos:

    opts = [
      slot: "articles_slot_elixir",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    CDC.Replication.start_link(opts)
    

    Cuando iniciamos el proceso, ocurre lo siguiente:

    1. Una vez que estamos conectados a Postgres, se llama al callback handle_connect/1, se crea un slot de replicación lógica temporal.
    2. Se llama a handle_result/2 con el resultado de la consulta en el paso 1. Si el slot se creó correctamente, comenzamos a transmitir desde el slot y entramos en el modo de transmisión. La posición solicitada ‘0/0’ significa que Postgres elige la posición.
    3. Cualquier mensaje de replicación enviado desde Postgres se recibe en el callback handle_data/2.

    Mensajes de replicación

    Hay dos tipos de mensajes que un suscriptor recibe:

    1. primary_keep_alive: un mensaje de comprobación, si reply == 1 se espera que el suscriptor responda al mensaje con un standby_status_update para evitar una desconexión por tiempo de espera.

    El standby_status_update contiene el LSN actual que el suscriptor ha procesado.

    Postgres utiliza este mensaje para determinar qué segmentos de WAL se pueden eliminar de forma segura.

    1. xlog_data: contiene los mensajes de datos para cada paso en una transacción.Dado que no estamos respondiendo a los mensajes primary_keep_alive, el proceso se desconecta y se reinicia.

    Arreglemos esto decodificando los mensajes y comenzando a responder con mensajes standby_status_update.

    
    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      require Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.{Messages, Lsn}
    
      require Logger
    
      defstruct [
        :publications,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok, %__MODULE__{slot: slot, publications: pubs}}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        return_msgs =
          msg
          |> Messages.decode()
          |> handle_msg()
    
        {:noreply, return_msgs, state}
      end
    
      #
      defp handle_msg(Messages.msg_primary_keep_alive(server_wal: lsn, reply: 1)) do
        Logger.debug("msg_primary_keep_alive message reply=true")
        <<lsn::64>> = Lsn.encode(lsn)
    
        [standby_status_update(lsn)]
      end
    
      defp handle_msg(Messages.msg_primary_keep_alive(reply: 0)), do: []
    
      defp handle_msg(Messages.msg_xlog_data(data: data)) do
        Logger.debug("xlog_data message: #{inspect(data, pretty: true)}")
        []
      end
    
      defp standby_status_update(lsn) do
        [
          wal_recv: lsn + 1,
          wal_flush: lsn + 1,
          wal_apply: lsn + 1,
          system_clock: Messages.now(),
          reply: 0
        ]
        |> Messages.msg_standby_status_update()
        |> Messages.encode()
      end
    
      
    defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    

    handle_data/2 decodifica el mensaje y lo pasa a handle_msg/1. Si es un primary_keep_alive , respondemos con un standby_status_update.

    El LSN denota una posición de byte en el WAL.

    El suscriptor responde con el LSN que ha manejado actualmente, como no estamos haciendo seguimiento de los mensajes que recibimos, simplemente confirmamos con el LSN enviado desde el servidor.

    A continuación, manejaremos los mensajes xlog_data, la idea aquí es que capturaremos cada operación en una estructura de transacción.

    Capturando transacciones

    El módulo CDC.Protocol manejará los mensajes xlog_data y rastreará el estado de la transacción

    defmodule CDC.Protocol do
      import Postgrex.PgOutput.Messages
      require Logger
    
      alias CDC.Tx
      alias Postgrex.PgOutput.Lsn
    
      
    @type t :: %__MODULE__{
              tx: Tx.t(),
              relations: map()
            }
    
      defstruct [
        :tx,
        relations: %{}
      ]
    
      @spec new() :: t()
      def new do
        %__MODULE__{}
      end
    
      def handle_message(msg, state) when is_binary(msg) do
        msg
        |> decode()
        |> handle_message(state)
      end
    
      def handle_message(msg_primary_keep_alive(reply: 0), state), do: {[], nil, state}
      def handle_message(msg_primary_keep_alive(server_wal: lsn, reply: 1), state) do
        Logger.debug("msg_primary_keep_alive message reply=true")
        <<lsn::64>> = Lsn.encode(lsn)
    
        {[standby_status_update(lsn)], nil, state}
      end
    
      def handle_message(msg, %__MODULE__{tx: nil, relations: relations} = state) do
        tx =
          [relations: relations, decode: true]
          |> Tx.new()
          |> Tx.build(msg)
    
        {[], nil, %{state | tx: tx}}
      end
    
      def handle_message(msg, %__MODULE__{tx: tx} = state) do
        case Tx.build(tx, msg) do
          %Tx{state: :commit, relations: relations} ->
            tx = Tx.finalize(tx)
            relations = Map.merge(state.relations, relations)
            {[], tx, %{state | tx: nil, relations: relations}}
    
          tx ->
            {[], nil, %{state | tx: tx}}
        end
      end
    
      defp standby_status_update(lsn) do
        [
          wal_recv: lsn + 1,
          wal_flush: lsn + 1,
          wal_apply: lsn + 1,
          system_clock: now(),
          reply: 0
        ]
        |> msg_standby_status_update()
        |> encode()
      end
    end
    

    CDC.Tx maneja mensajes recibidos dentro de la transacción, begin, relation, insert/update/delete y commit.

    defmodule CDC.Tx do
      import Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.Lsn
    
      alias __MODULE__.Operation
    
      @type t :: %__MODULE__{
              operations: [Operation.t()],
              relations: map(),
              timestamp: term(),
              xid: pos_integer(),
              state: :begin | :commit,
              lsn: Lsn.t(),
              end_lsn: Lsn.t()
            }
    
      defstruct [
        :timestamp,
        :xid,
        :lsn,
        :end_lsn,
        relations: %{},
        operations: [],
        state: :begin,
        decode: true
      ]
    
      def new(opts \\ []) do
        struct(__MODULE__, opts)
      end
    
      def finalize(%__MODULE__{state: :commit, operations: ops} = tx) do
        %{tx | operations: Enum.reverse(ops)}
      end
    
      def finalize(%__MODULE__{} = tx), do: tx
    
      @spec build(t(), tuple()) :: t()
      def build(tx, msg_xlog_data(data: data)) do
        build(tx, data)
      end
    
      def build(tx, msg_begin(lsn: lsn, timestamp: ts, xid: xid)) do
        %{tx | lsn: lsn, timestamp: ts, xid: xid, state: :begin}
      end
    
      def build(%__MODULE__{state: :begin, relations: relations} = tx, msg_relation(id: id) = rel) do
        %{tx | relations: Map.put(relations, id, rel)}
      end
    
      def build(%__MODULE__{state: :begin, lsn: tx_lsn} = tx, msg_commit(lsn: lsn, end_lsn: end_lsn))
          when tx_lsn == lsn do
        %{tx | state: :commit, end_lsn: end_lsn}
      end
    
      def build(%__MODULE__{state: :begin} = builder, msg_insert(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      def build(%__MODULE__{state: :begin} = builder, msg_update(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      def build(%__MODULE__{state: :begin} = builder, msg_delete(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      # skip unknown messages
      def build(%__MODULE__{} = tx, _msg), do: tx
    
      defp build_op(%__MODULE__{state: :begin, relations: rels, decode: decode} = tx, id, msg) do
        rel = Map.fetch!(rels, id)
        op = Operation.from_msg(msg, rel, decode)
    
        %{tx | operations: [op | tx.operations]}
      end
    end
    

    CDC.Tx.Operation maneja los mensajes INSERT/UPDATE/DELETE y decodifica los datos combinándolos con la relación

    defmodule CDC.Tx.Operation do
      @moduledoc "Describes a change (INSERT, UPDATE, DELETE) within a transaction."
    
      import Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.Type, as: PgType
    
      @type t :: %__MODULE__{}
      defstruct [
        :type,
        :schema,
        :namespace,
        :table,
        :record,
        :old_record,
        :timestamp
      ]
    
      @spec from_msg(tuple(), tuple(), decode :: boolean()) :: t()
      def from_msg(
            msg_insert(data: data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :insert,
          namespace: ns,
          schema: into_schema(columns),
          table: name,
          record: cast(data, columns, decode?),
          old_record: %{}
        }
      end
    
      def from_msg(
            msg_update(change_data: data, old_data: old_data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :update,
          namespace: ns,
          table: name,
          schema: into_schema(columns),
          record: cast(data, columns, decode?),
          old_record: cast(columns, old_data, decode?)
        }
      end
    
      def from_msg(
            msg_delete(old_data: data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :delete,
          namespace: ns,
          schema: into_schema(columns),
          table: name,
          record: %{},
          old_record: cast(data, columns, decode?)
        }
      end
    
      defp into_schema(columns) do
        for c <- columns do
          c
          |> column()
          |> Enum.into(%{})
        end
      end
    
      defp cast(data, columns, decode?) do
        Enum.zip_reduce([data, columns], %{}, fn [text, typeinfo], acc ->
          key = column(typeinfo, :name)
    
          value =
            if decode? do
              t =
                typeinfo
                |> column(:type)
                |> PgType.type_info()
    
              PgType.decode(text, t)
            else
              text
            end
    
          Map.put(acc, key, value)
        end)
      end
    end
    

    Como antes, el mensaje primary_keep_alive con reply == 1 envía un standby_status_update. Cuando recibimos un mensaje xlog_data, creamos un nuevo %Tx{} que usamos para “construir” la transacción hasta que recibimos un msg_commit que marca el final de la transacción.

    Cualquier mensaje de inserción, actualización o eliminación crea una CDC.Tx.Operation en la transacción, cada operación contiene un relation_id que se utiliza para buscar la relación desde tx.relations.

    La operación junto con la relación nos permite decodificar los datos. La información de columna y tipo se recupera de la relación y se utiliza para decodificar los valores en términos de Elixir.

    Una vez que estamos en un estado de commit, fusionamos Tx.relations con Protocol.relations, ya que un mensaje de relación sólo se transmitirá la primera vez que se encuentre una tabla durante la sesión de conexión, Protocol.relations contiene todos los msg_relation que se nos han enviado durante la sesión.

    El módulo CDC.Replication ahora se ve así:

    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      alias CDC.Protocol
    
      require Logger
    
      defstruct [
        :publications,
        :protocol,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok,
         %__MODULE__{
           slot: slot,
           publications: pubs,
           protocol: Protocol.new()
         }}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
    
        if not is_nil(tx) do
          Logger.debug("Tx: #{inspect(tx, pretty: true)}")
        end
    
        {:noreply, return_msgs, %{state | protocol: protocol}}
      end
    
      
    defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    

    handle_data/2 llama a Protocol.handle_message/1 que devuelve una tupla con tres elementos {messages_to_send :: [binary()], complete_transaction :: CDC.Tx.t() | nil, CDC.Protocolo.t()}

    Por ahora solo inspeccionamos la transacción cuando se emite desde Protocol.handle_message/3, probémoslo:

    Interactive Elixir (1.14.0) - press Ctrl+C to exit (type h() ENTER for help)
    opts = [
      slot: "articles_slot_elixir",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    {:ok, _} = CDC.Replication.start_link(opts)
    {:ok, pid} = Postgrex.start_link(opts)
    
    insert_query = """
    INSERT INTO articles (title, description, body) 
    VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
    """
    
    _ = Postgrex.query!(pid, insert_query, [])
      
    14:03:48.020 [debug] Tx: %CDC.Tx{
      timestamp: ~U[2022-10-31 13:03:48Z],
      xid: 494,
      lsn: {0, 22981920},
      end_lsn: nil,
      relations: %{
        16386 => {:msg_relation, 16386, "public", "articles", :default,
         [
           {:column, [:key], "id", :int4, -1},
           {:column, [], "title", :text, -1},
           {:column, [], "description", :text, -1},
           {:column, [], "body", :text, -1}
         ]}
      },
      operations: [
        %CDC.Tx.Operation{
          type: :insert,
          schema: [
            %{flags: [:key], modifier: -1, name: "id", type: :int4},
            %{flags: [], modifier: -1, name: "title", type: :text},
            %{flags: [], modifier: -1, name: "description", type: :text},
            %{flags: [], modifier: -1, name: "body", type: :text}
          ],
          namespace: "public",
          table: "articles",
          record: %{
            "body" => "with Elixir!",
            "description" => "Using logical replication",
            "id" => 6,
            "title" => "Postgres replication"
          },
          old_record: %{},
          timestamp: nil
        }
      ],
      state: :begin,
      decode: true
    }
    

    Cada cambio en la transacción se almacena en Tx.operations, operation.record es la fila decodificada como un mapa.

    Finalmente, implementemos una forma de suscribirnos a los cambios de CDC.Replication:

    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      alias CDC.Protocol
    
      require Logger
    
      defstruct [
        :publications,
        :protocol,
        :slot,
        :state,
        subscribers: %{}
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      def subscribe(pid, opts \\ []) do
        Postgrex.ReplicationConnection.call(pid, :subscribe, Keyword.get(opts, :timeout, 5_000))
      end
    
      def unsubscribe(pid, ref, opts \\ []) do
        Postgrex.ReplicationConnection.call(
          pid,
          {:unsubscribe, ref},
          Keyword.get(opts, :timeout, 5_000)
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok,
         %__MODULE__{
           slot: slot,
           publications: pubs,
           protocol: Protocol.new()
         }}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
    
        if not is_nil(tx) do
          notify(tx, state.subscribers)
        end
    
        {:noreply, return_msgs, %{state | protocol: protocol}}
      end
    
      # Replies must be sent using `reply/2`
      # https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#reply/2
      @impl true
      def handle_call(:subscribe, {pid, _} = from, state) do
        ref = Process.monitor(pid)
    
        state = put_in(state.subscribers[ref], pid)
    
        Postgrex.ReplicationConnection.reply(from, {:ok, ref})
    
        {:noreply, state}
      end
    
      def handle_call({:unsubscribe, ref}, from, state) do
        {reply, new_state} =
          case state.subscribers do
            %{^ref => _pid} ->
              Process.demonitor(ref, [:flush])
    
              {_, state} = pop_in(state.subscribers[ref])
              {:ok, state}
    
            _ ->
              {:error, state}
          end
    
        from && Postgrex.ReplicationConnection.reply(from, reply)
    
        {:noreply, new_state}
      end
    
      @impl true
      def handle_info({:DOWN, ref, :process, _, _}, state) do
        handle_call({:unsubscribe, ref}, nil, state)
      end
    
      defp notify(tx, subscribers) do
        for {ref, pid} <- subscribers do
          send(pid, {:notification, self(), ref, tx})
        end
    
        :ok
      end
    
      defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    
    

    Y lo podemos usar así:

    opts = [
      slot: "articles_slot",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    {:ok, pid} = CDC.Replication.start_link(opts)
    {:ok, pg_pid} = Postgrex.start_link(opts)
    {:ok, ref} = CDC.Replication.subscribe(pid)
    
    insert_query = """
    INSERT INTO articles (title, description, body) 
    VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
    """
    
    _ = Postgrex.query!(pg_pid, insert_query, [])
    flush()
    
    {:notification, #PID<0.266.0>, #Reference<0.2499916608.3416784901.94813>,
     %CDC.Tx{
       timestamp: ~U[2022-10-31 13:26:35Z],
       xid: 495,
       lsn: {0, 22983536},
       end_lsn: nil,
       relations: %{
         16386 => {:msg_relation, 16386, "public", "articles", :default,
          [
            {:column, [:key], "id", :int4, -1},
            {:column, [], "title", :text, -1},
            {:column, [], "description", :text, -1},
            {:column, [], "body", :text, -1}
          ]}
       },
       operations: [
         %CDC.Tx.Operation{
           type: :insert,
           schema: [
             %{flags: [:key], modifier: -1, name: "id", type: :int4},
             %{flags: [], modifier: -1, name: "title", type: :text},
             %{flags: [], modifier: -1, name: "description", type: :text},
             %{flags: [], modifier: -1, name: "body", type: :text}
           ],
           namespace: "public",
           table: "articles",
           record: %{
             "body" => "with Elixir!",
             "description" => "Using logical replication",
             "id" => 7,
             "title" => "Postgres replication"
           },
           old_record: %{},
           timestamp: nil
         }
       ],
       state: :begin,
       decode: true
     }}
    

    Conclusión

    Si está buscando una manera de capturar cambios de su base de datos con cambios mínimos en su configuración existente, definitivamente vale la pena considerar Cambiar la captura de datos. Con Elixir y postgrex hemos implementado un mini Debezium en ~400 LOC. La fuente completa está disponible aquí .

    Si necesita ayuda con la implementación de Elixir, nuestro equipo de expertos líder en el mundo siempre está aquí para ayudarlo. Contáctenos hoy para saber cómo podemos ayudarlo.

    The post Captura de datos con Postgres y Elixir appeared first on Erlang Solutions .

    • chevron_right

      Ignite Realtime Blog: Spark 3.0.2 Released

      news.movim.eu / PlanetJabber • 31 March, 2023 • 1 minute

    The Ignite Realtime community is happy to announce the availability of Spark version 3.0.2

    The release contains bug fixes and updates two plugins Translator and Roar.

    Many Spark translations are incomplete. Please help us translate Spark

    Full list of changes can be found in the changelog .

    We encourage users and developers to get involved with Spark project by providing feedback in the forums or submitting pull requests on our GitHub page.

    You can download Spark from the Downloads page. Below are the sha256 checksums:

    db7febafd05a064ffed4800066d11d0a2d27288aa3c0b174ed21a20672ab4669 *spark_3_0_2-64bit.exe
    647712b43942f7dc399901c9078ea1de7fcca14de3b5898bc3bdeee7751cf1b3 *spark_3_0_2-64bit.msi
    c1415417081095656c5fd7eb42e09ac6168f550af3c5254eb35506a404283a3c *spark_3_0_2-arm.exe
    78994eaa31d6c074bc72b9ac7a1f38f63488f9272c1a8870524f986dda618b6b *spark_3_0_2-with-jre.dmg
    ef3ba8eef5b88edc5e4ce9e13e9fa41ef2fad136cc6b518c52da79051c2a7c39 *spark_3_0_2-with-jre.exe
    d18dd7613333647f3fae6728731df7e3ef957103a84686ce1bb7befb9d32285f *spark_3_0_2-with-jre.msi
    39b03b4f4363bc053d2c5ed0c7452f97ced38a1c307ab77ce7f8b0e36459f498 *spark_3_0_2.deb
    5bc3bf000b4fe8a49d424ea53ccd6b62dae9291e987394f08c62ff0d69f0aec9 *spark_3_0_2.dmg
    e511c10db5f72e8b311f79dc3ac810040d44c5488f606cb860d7250d1dcf3709 *spark_3_0_2.exe
    f54d3990dd0ca76a1ca447af0bef9b4244861c7da9f6ab38a755c9cc578344c8 *spark_3_0_2.msi
    fff7fa157fe187b5307ef2383c87b8193a0416ffe41ffcb2ea0b0e6672a917f9 *spark_3_0_2.rpm
    a5cf06ccbe82dc308f2d2493bc0d89f552729fb993af392003263f0cd9caac16 *spark_3_0_2.sh
    03eae1e4e88fdc303752f3b5c0ef8bb00653cfdf28ee964699bba892e45110e4 *spark_3_0_2.tar.gz
    

    For other release announcements and news follow us on Twitter and Mastodon .

    1 post - 1 participant

    Read full topic

    • chevron_right

      Erlang Solutions: 5 Key Tech Priorities for Fintech Leaders in 2023

      news.movim.eu / PlanetJabber • 30 March, 2023 • 6 minutes

    The fintech industry is a major disruptor. Each year, it impacts how consumers interact with financial companies and brings new and innovative means to meet ever-growing customer expectations and occupy market space.

    As a business owner or executive in this space, you have no choice but to stay on top of your game to increase efficiency.

    In simpler terms, if your business doesn’t scale, it could fail.

    That might sound a tad extreme, but you’re dealing in a market that is set to be worth $698.48 billion by 2030. Understanding the trends and focuses of the time helps you know where you’re headed in order to remain competitive.

    So let’s talk about these trends that drive the market.

    Embed finance

    The embedded finance market is expected to see massive growth this year. The market has been growing at a rapid pace since 2020. Fintech companies have been steadily outpacing traditional banking services when it comes to gaining the trust of consumers.

    According to our latest report , searches for the term have increased by a staggering 488% in the last five years.

    Embedded finance solutions can make up 50% of total banking revenue streams in the near future. There is also a significant market for embedded financial products in the areas of deposit accounts, payments, insurance, and lending.

    Source: Mckinsey analysis

    Arguably the greatest benefit to embedding finance is that under the direction of inclusive fintech startups, it has the potential to empower potential customers who were previously excluded from the conventional financial industry. Similarly, emerging markets can provide a less stifling environment with lower prices and a larger customer base, which would further encourage innovation.

    We’re also likely to see traditional financial services and fintech firms, such as banks and payment processors, collaborating more closely to adopt embedded finance. This could result in payment fintechs providing more inclusive services tailored to various business models, leasing more potential while banks provide fundamental infrastructure.

    Embedded finance solutions will place much greater emphasis on technological advantages and operational capability to address your business’s current issues. Think risk and compliance management. Innovations like distributed computing and artificial intelligence (AI) will have multiple effects across all businesses, strengthening their ability to embrace embedded finance.

    AI

    Financial institutions are continuing to adopt AI and machine learning, the industry is set to see a projected savings of up to 22% by the year 2030 .

    How is this possible? AI-driven chatbots and assistants can make the customer experience very efficient. They can answer customer questions, monitor spending and tailor product recommendations based directly on customer preferences.

    While chatbots do not quite replicate the human experience, their growth across the market this year means we can expect more of them in 2023.

    Source: Adobe

    Think about your price comparison sites when looking for insurance, or travel rates. These services have been made possible through the application of Natural Language Processing (NLP), which allows the ability to make payments and offer personalised service at any given time.

    A key element of AI technology is its ability to predict human interactions, combining the best of two words, intelligence and behavioural finance. What is Behavioural Finance? It focuses on human psychology to influence economic markets which in turn, impacts market outcomes. It is considered the future of fintech, as supported by the development of data analytics and large amounts of consumer data. This data, combined with AI algorithms, creates the ability to provide such personalised services.

    The industry is starting to recognize the goldmine that is AI. It doesn’t just drive efficiency but also makes allows businesses to be smarter in their approach to interacting with customers in the market.

    Blockchain

    Blockchain is one of the most exciting trends in Fintech today. It addresses the problem of unsecured, expensive fund transfer processes, with high-speed transactions at a much lower cost.

    It works as a digital ledger that verifies, records, and facilitates various types of transactions. Its major selling point is security and autonomy for customers, as businesses and individuals can safely transfer digital assets without relying on a central authority or financial institutions.

    But this isn’t just the only thing that appeals to users.

    Blockchain is used in various fintech applications, such as:

    • Decentralised finance (DeFi): Creates decentralised financial services- for example borrowing, health insurance, lending services etc
    • Digital assets: Provides a secure, transparent method of storing and trading digital assets- got example, NFTs and cryptocurrencies.
    • Cross-border payment services: Enable secure and fast cross-border payments
    • Supply chain management: Provides transparent and secure methods of tracking goods and assets within supply chain networks.
    • Identity management- Creates decentralised identity systems, offering increased security and privacy.

    As blockchain eliminates the need for traditional banking, it now makes it easier for customers to utilise these financial offerings. This is particularly significant for third-world and developing countries, where access to traditional banks can sometimes be challenging.

    Customer Experience

    We’ve spoken a lot about customer experience expectations already, but it is integral to the future of fintech services. After all, the focus will always be on the people your business serves.

    Banks that consistently optimise the customer experience are set to grow 3.2x faster than their competitors.

    It’s up to businesses to provide seamless, user-friendly experiences to attract and retain their customers. This means focusing investment on user-centred design, more personalised services and omnichannel support.

    Put yourselves in your customer’s shoes to understand their pain points, and use that data to create customised solutions that far exceed expectations.

    Sustainability and ESG

    Any forward-thinking businesses will have Environmental, Social and Corporate Governance (ESG) and sustainability at the heart of their plans this year.

    Customers, investors and employees alike are keen to see businesses contribute to making society more sustainable. And with a net-zero goal to be met by 2050 (in the UK), organisations across the globe are under much greater scrutiny to implement supporting policies.

    According to FIS Global , “ESG is top of mind for financial services firms globally, with 60% of executives saying they are developing new ESG products and services.”

    So if your business isn’t environmentally friendly, it might have an impact on your customer base. Brands that care about the planet and show strategic planning are the ones set to thrive. If you don’t care, expect your customers to find a business that does.

    This increased sustainability interest has led to a lot of financial institutions offering a diverse portfolio of sustainable options, such as loans backed by environmental enterprises, such as investment into wind or solar farms. ESG bonds are also rising in popularity.

    A trend we’ll see this year will be sustainable (green) financing. This is another way that financial regulations and products are orchestrated to meet environmentally sustainable outcomes.

    Green Financing will be a key Fintech trend that will aid the private sector in doing positive work for the environment. It will also encourage private-public alliances on financial mechanisms like green bonds.

    Looking ahead

    With the ongoing development of the Fintech industry, technological innovation must work in tandem with the economy’s call for greater financial inclusion.

    As firms across the globe embrace the potential of tech to champion the future, consider these trends and how (or if) your business is incorporating them.

    Want a more in-depth look into fintech themes and how they should inform your tech strategy and decision-making? You can download our latest whitepaper to hear from the experts.

    Our team are also on hand if you’d like a chat about your fintech project or any projects you might have. Don’t hesitate to contact us .

    The post 5 Key Tech Priorities for Fintech Leaders in 2023 appeared first on Erlang Solutions .

    • wifi_tethering open_in_new

      This post is public

      www.erlang-solutions.com /blog/5-key-tech-priorities-for-fintech-leaders-in-2023/

    • chevron_right

      Erlang Solutions: Cómo depurar tu RabbitMQ

      news.movim.eu / PlanetJabber • 29 March, 2023 • 14 minutes

    Descubre las herramientas y métodos adecuados para la depuración de RabbitMQ.

    Lo que aprenderás en este blog.

    Nuestros clientes de consultoría de RabbitMQ provienen de una amplia gama de industrias. Como resultado, hemos visto casi todos los comportamientos inesperados que puede presentar. RabbitMQ es un software complejo que emplea concurrencia y cómputo distribuido (a través de Erlang), por lo que depurarlo no siempre es sencillo. Para llegar a la causa raíz de un comportamiento inesperado (y no deseado), necesitas las herramientas adecuadas y la metodología adecuada. En este artículo, demostraremos ambas para ayudarte a aprender la técnica de depuración en RabbitMQ.

    El problema de depurar RabbitMQ.

    La inspiración para este blog proviene de un ejemplo real. Uno de nuestros clientes tenía la API HTTP de administración de RabbitMQ proporcionando información crucial a su sistema. El sistema dependía mucho de la API, específicamente del endpoint /api/queues  porque el sistema necesitaba saber el número de mensajes listos en cada cola en un clúster de RabbitMQ. El problema era que a veces una solicitud HTTP al endpoint duraba hasta decenas de segundos (en el peor de los casos, ni siquiera podían obtener una respuesta de la API).

    Entonces, ¿qué causó que algunas solicitudes tomaran tanto tiempo? Para responder a esa pregunta, intentamos reproducir el problema a través de pruebas de carga.

    Ejecutando pruebas de carga.

    Utilizamos una plataforma que creamos para MongooseIM para ejecutar nuestras Pruebas de Carga Continuas . Aquí están algunos de los aspectos más importantes de la plataforma:

    1. todos los servicios involucrados en una prueba de carga se ejecutan dentro de contenedores de docker
    2. la carga es generada por Amoc ; es una herramienta de código abierto escrita en Erlang para generar cargas masivamente paralelas de cualquier tipo (AMQP en nuestro caso)
    3. se recopilan métricas del sistema en prueba y del sitio de Amoc para un análisis posterior.

    El diagrama a continuación representa una arquitectura lógica de una prueba de carga de ejemplo con RabbitMQ:

    En el diagrama, el lado izquierdo muestra un clúster de nodos de Amoc que emulan clientes AMQP que, a su vez, generan la carga contra RabbitMQ. Por otro lado, podemos ver un clúster de RabbitMQ que sirve a los clientes AMQP. Todas las métricas de los servicios de Amoc y RabbitMQ se recopilan y almacenan en una base de datos InfluxDB.

    Consultas lentas de Management HTTP API

    Intentamos reproducir las consultas lentas a Management HTTP API en nuestras pruebas de carga. El escenario de prueba fue bastante sencillo. Un grupo de editores publicaba mensajes en el intercambio predeterminado. Los mensajes de cada editor se dirigían a una cola dedicada (cada editor tenía una cola dedicada). También había consumidores conectados a cada cola. La replicación de cola estaba habilitada.

    Para valores concretos, consulte la tabla a continuación:

    Esa configuración estresó los servidores Rabbit en nuestra infraestructura. Como se ve en los gráficos a continuación:

    Cada nodo de RabbitMQ consumió alrededor de 6 (de 7) núcleos de CPU y aproximadamente 1,4 GB de RAM, excepto rabbitmq-1 que consumió significativamente más que los otros. Eso se debió probablemente a que tuvo que atender más solicitudes de Management HTTP API que los otros dos nodos.

    Durante la prueba de carga, se consultó el endpoint /api/queues cada dos segundos para obtener la lista de todas las colas junto con los valores correspondientes de messages_ready . Una consulta se veía así:

    http://rabbitmq-1:15672/api/queues?columns=name,messages_ready

    Aquí están los resultados de la prueba:

    La figura anterior muestra el tiempo de consulta durante una prueba de carga. Está claro que las cosas son muy lentas. La mediana es de 1,5 segundos mientras que los percentiles 95, 99, 999 y máx. llegan a 20 segundos.

    Debugging

    Una vez confirmado el problema y puede reproducirse, estamos listos para comenzar a depurar. La primera idea fue encontrar la función Erlang que se llama cuando llega una solicitud a la API de administración HTTP de RabbitMQ y determinar dónde esa función pasa su tiempo de ejecución. Si pudiéramos hacer esto, nos permitiría localizar el código más costoso detrás de la API.

    Encontrar la función de entrada

    Para encontrar la función que estábamos buscando, tomamos los siguientes pasos:

    1. buscamos en el complemento de administración de RabbitMQ para encontrar la asignación adecuada de “ruta HTTP a función”,
    2. usamos la función de rastreo de Erlang para verificar si se llama a una función encontrada cuando llega una solicitud.

    El complemento de administración utiliza cowboy (un servidor HTTP de Erlang) debajo para servir las solicitudes de API. Cada punto final de HTTP requiere un módulo de devolución de llamada de cowboy, por lo que encontramos fácilmente la función rabbit_mgmt_wm_queues:to_json/2 que parecía manejar las solicitudes que llegaban a /api/queues. Confirmamos eso con el rastreo (usando una biblioteca de recuperación que se envía con RabbitMQ por defecto).

    root@rmq-test-rabbitmq-1:/rabbitmq_server-v3.7.9# erl -remsh rabbit@rmq-test-rabbitmq-1 -sname test2 -setcookie rabbit  
    Erlang/OTP 21 [erts-10.1] [source] [64-bit] [smp:22:7] [ds:22:7:10] [async-threads:1]  
    
    Eshell V10.1  (abort with ^G)  
    (rabbit@rmq-test-rabbitmq-1)1> recon_trace:calls({rabbit_mgmt_wm_queues, to_json, 2}, 1).  
    1  
    
    11:0:48.464423 <0.1294.15> rabbit_mgmt_wm_queues:to_json(#{bindings => #{},body_length => 0,cert => undefined,charset => undefined,  
      has_body => false,  
      headers =>  
          #{<<"accept">> => <<"*/*">>,  
            <<"authorization">> => <<"Basic Z3Vlc3Q6Z3Vlc3Q=">>,  
            <<"host">> => <<"10.100.10.140:53553">>,  
            <<"user-agent">> => <<"curl/7.54.0">>},  
      host => <<"10.100.10.140">>,host_info => undefined,  
      media_type => {<<"application">>,<<"json">>,[]},  
      method => <<"GET">>,path => <<"/api/queues">>,path_info => undefined,  
      peer => {{10,100,10,4},54136},  
      pid => <0.1293.15>,port => 53553,qs => <<"columns=name,messages_ready">>,  
      ref => rabbit_web_dispatch_sup_15672,  
      resp_headers =>  
          #{<<"content-security-policy">> => <<"default-src 'self'">>,  
            <<"content-type">> => [<<"application">>,<<"/">>,<<"json">>,<<>>],  
            <<"vary">> =>  
                [<<"accept">>,  
                 [<<", ">>,<<"accept-encoding">>],  
                 [<<", ">>,<<"origin">>]]},  
      scheme => <<"http">>,  
      sock => {{172,17,0,4},15672},  
      streamid => 1,version => 'HTTP/1.1'}, {context,{user,<<"guest">>,  
                   [administrator],  
                   [{rabbit_auth_backend_internal,none}]},  
             <<"guest">>,undefined})  
    Recon tracer rate limit tripped. 
    

    El fragmento anterior muestra que primero habilitamos el seguimiento para rabbit_mgmt_wm_queues:to_json/2 , luego enviamos manualmente una solicitud a la API de administración (usando curl; no visible en el fragmento) y que generó el evento de seguimiento. Así es como encontramos nuestro punto de entrada para un análisis más profundo.

    Usando flame graphs

    Una vez que hemos encontrado una función que sirve las solicitudes, ahora podemos verificar cómo esa función pasa su tiempo de ejecución. La técnica ideal para hacer esto es Flame Graphs. Una de sus definiciones establece que:

    Los gráficos de llamas son una visualización del software perfilado, lo que permite identificar rápidamente y con precisión los caminos de código más frecuentes.

    En nuestro caso, podríamos usar gráficos de llamas para visualizar la pila de llamadas de la función o, en otras palabras, qué funciones se llaman dentro de una función rastreada y cuánto tiempo tarda (en relación con el tiempo de ejecución de la función rastreada) para que estas funciones se ejecuten. Esta visualización ayuda a identificar rápidamente las funciones sospechosas en el código.

    Para Erlang, existe una biblioteca llamada eflame que tiene herramientas tanto para: recopilar trazas del sistema Erlang como para construir un gráfico de llamas a partir de los datos. ¿Pero cómo inyectamos esa biblioteca en Rabbit para nuestra prueba de carga?

    Construyendo una imagen personalizada de Docker para RabbitMQ

    Como mencionamos anteriormente, todos los servicios de nuestra plataforma de pruebas de carga se ejecutan dentro de contenedores Docker. Por lo tanto, tuvimos que construir una imagen personalizada de Docker para RabbitMQ con la biblioteca eflame incluida en el código del servidor. Creamos un repositorio de RabbitMQ-docker que hace que sea fácil construir una imagen de Docker con el código fuente de RabbitMQ modificado.

    Perfilando con eflame

    Una vez que tuvimos una imagen de Docker de RabbitMQ modificada con eflame incluido, pudimos ejecutar otra prueba de carga (las especificaciones eran las mismas que en la prueba anterior) y comenzar el perfilado real. Estos fueron los resultados:

    Realizamos varias mediciones y obtuvimos dos tipos de resultados como se presentan arriba. La principal diferencia entre estos gráficos se encuentra en la función rabbit_mgmt_util:run_run_augmentation/2 . ¿Qué significa esa diferencia?

    A partir de los resultados de las pruebas de carga anteriores y del análisis manual del código, sabemos que existen consultas lentas y rápidas. Las solicitudes lentas pueden tardar hasta veinte segundos, mientras que las rápidas solo tardan unos pocos segundos. Esto confirma el gráfico de tiempo de consulta anterior con un percentil del 50 de alrededor de 1,5 segundos, pero el 95 (y porcentajes más altos) que llegan hasta 20 segundos. Además, medimos manualmente el tiempo de ejecución de ambos casos utilizando timer:tc/3 y los resultados fueron consistentes.

    Esto sucede porque hay una caché en el plugin de Management. Cuando la caché es válida, las solicitudes se sirven mucho más rápido ya que los datos ya se han recopilado, pero cuando es inválida, es necesario recopilar toda la información necesaria.

    A pesar de que los gráficos tienen la misma longitud en la imagen, representan diferentes tiempos de ejecución (rápido vs lento). Por lo tanto, es difícil adivinar qué gráfico muestra qué consulta sin tomar realmente una medición. El primer gráfico muestra una consulta rápida mientras que el segundo muestra una consulta lenta. En el gráfico de consulta lenta rabbit_mgmt_util:augment/2 -> rabbit_mgmt_db:submit_cached/4 -> gen_server:call/3 -> … la pila tarda tanto tiempo porque la caché es inválida y se necesita recopilar datos nuevos. Entonces, ¿qué sucede cuando se recopilan los datos?

    Perfiles con fprof

    Podrías preguntar: “¿por qué no vemos las funciones de recopilación de datos en los gráficos de llama?” Esto sucede porque la caché se implementa como otro proceso de Erlang y la recopilación de datos ocurre dentro del proceso de caché. Hay una función gen_server:call/3 visible en los gráficos que hace una llamada al proceso de caché y espera una respuesta. Dependiendo del estado de la caché (válido o inválido), una respuesta puede volver rápidamente o lentamente.

    La recopilación de datos se implementa en la función rabbit_mgmt_db:list_queue_stats/3 que se invoca desde el proceso de caché. Naturalmente, deberíamos perfilar esa función. Probamos con eflame y después de varias docenas de minutos , este es el resultado que obtuvimos:

    eheap_alloc: Cannot allocate 42116020480 bytes of memory (of type "old_heap").
    

    El asignador de memoria del montón de Erlang intentó asignar 42 GB de memoria (de hecho, se necesitaba espacio para que el recolector de basura operara) y se bloqueó el servidor. Como eflame aprovecha el seguimiento de Erlang para generar gráficos de llama, probablemente se sobrecargó con la cantidad de eventos de seguimiento generados por la función rastreada. Ahí es donde entra en juego fprof.

    Según la documentación oficial de Erlang, fprof es:

    una herramienta de perfilado de tiempo que utiliza el seguimiento de archivo para un impacto mínimo en el rendimiento en tiempo de ejecución.

    Esto es muy cierto. La herramienta manejó la función de recopilación de datos sin problemas, aunque tardó varios minutos en producir el resultado. La salida fue bastante grande, así que solo se enumeran las líneas cruciales a continuación:

    (rabbit@rmq-test-rabbitmq-1)96> fprof:apply(rabbit_mgmt_db, list_queue_stats, [RA, B, 5000]).  
    ...
    (rabbit@rmq-test-rabbitmq-1)97> fprof:profile().  
    ...
    (rabbit@rmq-test-rabbitmq-1)98> fprof:analyse().  
    ...
    %                                       CNT        ACC       OWN  
    {[{{rabbit_mgmt_db,'-list_queue_stats/3-lc$^1/1-1-',4}, 803,391175.593,  105.666}],  
     { {rabbit_mgmt_db,queue_stats,3},              803,391175.593,  105.666},     %  
     [{{rabbit_mgmt_db,format_range,4},            3212,390985.427,   76.758},  
      {{rabbit_mgmt_db,pick_range,2},              3212,   58.047,   34.206},  
      {{erlang,'++',2},                            2407,   19.445,   19.445},  
      {{rabbit_mgmt_db,message_stats,1},            803,    7.040,    7.040}]}.  
    

    El resultado consiste en muchas de estas entradas. La función marcada con el carácter % es la que concierne a la entrada actual. Las funciones que se encuentran debajo son las que se llamaron desde la función marcada. La tercera columna ( ACC ) muestra el tiempo de ejecución total de la función marcada (tiempo de ejecución propio de la función y de los que la llaman) en milisegundos. Por ejemplo, en la entrada anterior, el tiempo de ejecución total de la función rabbit_mgmt_db:pick_range/2 es de 58.047 ms. Para obtener una explicación detallada de la salida de fprof, consulte la documentación oficial de fprof.

    La entrada anterior es la entrada de nivel superior que concierne a rabbit_mgmt_db:queue_stats/3 , que se llamó desde la función rastreada. Esa función gastó la mayor parte de su tiempo de ejecución en la función rabbit_mgmt_db:format_range/4. Podemos ir a una entrada que concierna a esa función y comprobar en qué gastó su tiempo de ejecución. De esta manera, podemos revisar la salida y encontrar posibles causas del problema de lentitud de la API de gestión.

    Al leer la salida de fprof de arriba hacia abajo, llegamos a esta entrada:

    {[{{exometer_slide,'-sum/5-anonymous-6-',7},   3713,364774.737,  206.874}],
     { {exometer_slide,to_normalized_list,6},      3713,364774.737,  206.874},     %
     [{{exometer_slide,create_normalized_lookup,4},3713,213922.287,   64.599}, %% SUSPICIOUS
      {{exometer_slide,'-to_normalized_list/6-lists^foldl/2-4-',3},3713,145165.626,   51.991}, %% SUSPICIOUS
      {{exometer_slide,to_list_from,3},            3713, 4518.772,  201.682},
      {{lists,seq,3},                              3713,  837.788,   35.720},
      {{erlang,'++',2},                            3712,   70.038,   70.038},
      {{exometer_slide,'-sum/5-anonymous-5-',1},   3713,   51.971,   25.739},
      {garbage_collect,                               1,    1.269,    1.269},
      {suspend,                                       2,    0.151,    0.000}]}.  
    

    La entrada se refiere a la función exometer_slide:to_normalized_list/6 que a su vez llamó a dos funciones “sospechosas” del mismo módulo. Profundizando encontramos esto:

    {[{{exometer_slide,'-create_normalized_lookup/4-anonymous-2-',5},347962,196916.209,35453.182},
      {{exometer_slide,'-sum/5-anonymous-4-',2},   356109,16625.240, 4471.993},
      {{orddict,update,4},                         20268881,    0.000,172352.980}],
     { {orddict,update,4},                         20972952,213541.449,212278.155},     %
     [{suspend,                                    9301,  682.033,    0.000},
      {{exometer_slide,'-sum/5-anonymous-3-',2},   31204,  420.574,  227.727},
      {garbage_collect,                              99,  160.687,  160.687},
      {{orddict,update,4},                         20268881,    0.000,172352.980}]}. 
    

    and

       {[{{exometer_slide,'-to_normalized_list/6-anonymous-5-',3},456669,133229.862, 3043.145},
      {{orddict,find,2},                           19369215,    0.000,129761.708}],
     { {orddict,find,2},                           19825884,133229.862,132804.853},     %
     [{suspend,                                    4754,  392.064,    0.000},
      {garbage_collect,                              22,   33.195,   33.195},
      {{orddict,find,2},                           19369215,    0.000,129761.708}]}.  
    

    Gran parte del tiempo de ejecución fue consumido por las funciones orddict:update/4 y orddict:find/2 . Ambas funciones combinadas representaron el 86% del tiempo total de ejecución.

    Esto nos llevó al módulo exometer_slide del plugin RabbitMQ Management Agent. Si se examina el módulo, se encontrarán todas las funciones mencionadas y las conexiones entre ellas.

    Decidimos cerrar la investigación en esta etapa porque este era claramente el problema. Ahora que hemos compartido nuestras reflexiones sobre el problema con la comunidad en este blog, quién sabe, tal vez encontraremos una nueva solución juntos.

    El efecto observador

    Hay una última cosa que es esencial considerar cuando se trata de depurar/observar sistemas: el efecto observador. El efecto observador es una teoría que afirma que si estamos monitoreando algún tipo de fenómeno, el proceso de observación cambia ese fenómeno.

    En nuestro ejemplo, utilizamos herramientas que se aprovechan del rastreo. El rastreo tiene un impacto en un sistema ya que genera, envía y procesa muchos eventos.

    Los tiempos de ejecución de las funciones mencionadas anteriormente aumentaron considerablemente cuando se llamaron con el perfilado habilitado. Las llamadas puras tomaron varios segundos mientras que las llamadas con el perfilado habilitado tomaron varios minutos. Sin embargo, la diferencia entre las consultas lentas y rápidas pareció permanecer sin cambios.

    El efecto observador no fue evaluado en el alcance del experimento descrito en esta publicación de blog.

    Una solución alternativa

    El problema puede ser resuelto de una manera ligeramente diferente. Pensemos por un momento si hay otra manera de obtener los nombres de las colas correspondientes a la cantidad de mensajes en ellas. Existe la función rabbit_amqqueue:emit_info_all/5 que nos permite recuperar la información exacta que nos interesa, directamente desde un proceso de cola. Podríamos utilizar esa API desde un plugin personalizado de RabbitMQ y exponer un punto final HTTP para enviar esos datos cuando se consulten.

    Convertimos esa idea en realidad y construimos un plugin de prueba de concepto llamado rabbitmq-queue-info que hace exactamente lo que se describe arriba. Incluso se realizó una prueba de carga del plugin (la especificación de la prueba fue exactamente la misma que la del plugin de gestión, como se mencionó anteriormente en el blog). Los resultados se muestran a continuación y hablan por sí solos:

    ¿Quieren más?

    ¿Quiere saber más sobre el rastreo en RabbitMQ, Erlang y Elixir? Consulte WombatOAM, un sistema intuitivo que facilita la supervisión y el mantenimiento de sus sistemas. Obtenga su prueba gratuita de 45 días de WombatOAM ahora.

    Apéndice

    La versión 3.7.9 de RabbitMQ se utilizó en todas las pruebas de carga mencionadas en esta publicación de blog. Un agradecimiento especial a Szymon Mentel y Andrzej Teleżyński por toda la ayuda con esa publicación.

    Nuestro trabajo con RabbitMQ.

    The post Cómo depurar tu RabbitMQ appeared first on Erlang Solutions .

    • chevron_right

      Erlang Solutions: Here’s Why You Should Build Scalable and Concurrent Applications with Elixir

      news.movim.eu / PlanetJabber • 28 March, 2023 • 4 minutes

    In today’s world, when dealing with high levels of system requests, you need applications that can handle them without slowing down. Here’s where Elixir comes in. Elixir is a programming language that is designed to create highly scalable and concurrent applications. Built on Erlang’s virtual machine (BEAM), it has been used for decades to build highly reliable and scalable systems.

    Keep reading and I’ll explain what makes Elixir so useful for businesses and the benefits there are to such a scalable system.

    A bit of background on Elixir

    Elixir was created in 2012 by Ruby developer Jose Valim. The Ruby programming language had long been considered the standard for developing enterprise apps because it is well-built and has a great framework. But Ruby was built at a time when we didn’t have the same system demands compared to now. Today, applications often run into issues with concurrency and scaling up applications.

    Valim wanted to enable higher extensibility and productivity for use in building large-scale sites and apps. For this, he turned to the older Erlang programming language . Erlang was built as a telecom solution with massive concurrency and the ability to handle millions of phone call connections. Building on top of Erlang and combining all the benefits of Ruby, led to the high-concurrency, low-latency language we know today. Elixir is now used by a wide variety of companies, including Discord, Pinterest, Moz, and more.

    Why businesses are adopting Elixir

    So why are businesses making the switch?

    Elixir-based development produces services that can handle substantially more traffic. You’ll have a platform that can expand and scale swiftly without compromising dependability, all while enhancing overall performance. More customers, more sales, and a higher return on investment (ROI) have proven to be big benefits.

    But don’t just take out word for it, have a look at some of our clients who are thriving after moving their systems.

    Building scalable and concurrent applications with Elixir

    Scalability and concurrency are crucial aspects of modern-day applications. With Elixir, you can build applications that can handle a large number of requests, without compromising performance.

    Its ability to run multiple processes concurrently enables developers to build highly scalable applications. The concurrency model also allows developers to create lightweight processes that can communicate with each other seamlessly. Elixir also provides a distributed environment, which allows developers to build applications that can scale horizontally -ideal for accommodating rapid business growth.

    More about the Actor Model

    Elixir’s concurrency model is based on the Actor model, which provides a message-passing system between processes.

    Source: Lightbend

    The “Actor Model” is for doing many things at the same time. It works by using actors as the basic building blocks. Think of them as little machines that can do things independently of each other, who talk to each other by sending messages. Each of these little machines are called “processes”.

    This way of working makes it easy to build systems that can handle multiple things happening at once, even when issues occur.

    Leveraging Elixir’s ecosystem for scalable and concurrent applications

    Elixir has a vast ecosystem of libraries and frameworks that can help developers build scalable and concurrent applications. One of the most popular frameworks is Phoenix. It provides features such as real-time communication, web sockets, and channels, which make it an ideal choice for building scalable and concurrent web applications.

    Elixir also has libraries such as GenServer, which provides a simple and powerful way to build concurrent applications.

    Other ecosystems also include Mix, a build tool that automates many of the tasks involved in creating Elixir applications. Mix provides tasks for creating new projects, testing, and deploying applications. Mix is also extensible, allowing developers to create their own tasks and plugins.

    Fault-Tolerance

    Elixir’s supervisor mechanism allows developers to build fault-tolerant applications that can recover from failures automatically. Elixir’s processes are isolated from each other, which means that if a process fails, it does not affect the entire system. Developers can also use Elixir’s built-in error handling mechanisms to handle errors gracefully.

    Fault tolerance systems. Source: Finematics

    Elixir is easy to learn

    A major draw to Elixir also lies in its simplicity. It has a simple, easy to learn syntax hat is again, a big plus for developers. It is also a productive language, it can accomplish a lot with just minimal code.

    The Elixir community

    Despite the relative newness of Elixir when compared to other languages, the fast growing Elixir community is very supportive, continually creating libraries and code to remain solid and robust.

    The Elixir revolution

    As digital transformation continues to reinvent business models, Elixir has become a growing choice for businesses looking for ways to differentiate themselves in complex technology markets.

    We are now in the age where companies are eager to find cutting-edge technologies that will revolutionise how users interact with their applications. If you’re looking to build scalable and concurrent applications, Elixir is definitely worth considering.If you’d like to learn more about Elixir, check out our page .

    The post Here’s Why You Should Build Scalable and Concurrent Applications with Elixir appeared first on Erlang Solutions .

    • wifi_tethering open_in_new

      This post is public

      www.erlang-solutions.com /blog/heres-why-you-should-build-scalable-and-concurrent-applications-with-elixir/

    • chevron_right

      Alexander Gnauck: XmppDotNet announcement

      news.movim.eu / PlanetJabber • 22 March, 2023

    I want to announce the availability of the XmppDotNet XMPP library. XmppDotNet is the new name and next generation of our MatriX vNext XMPP library .

    Why changing the name?

    It was never intended to keep vNext in the name forever. And there is a lot of confusion between MatriX and MatriX vNext at some of our existing customers. Most of them expect both libraries to be fully API compatible. Or they expect to have a very simple upgrade path.
    But this was never case and there are no plans to publish API compatible wrappers. The MatriX XMPP library development started over 2 decades ago, XMPP was known at Jabber these days. While Jabber/XMPP evolved a lot over the years the same applies also to the .NET technologies.

    Most of the code in XmppDotNet is rewritten, the API and architecture is completely redesigned. It targets .NET Core only. While many legacy protocols and extensions are still implemented and working the focus is on modern XMPP and extensions.

    The license is currently GPL v3. But there are plans to switch XmppDotNet to a less restrictive license.

    • wifi_tethering open_in_new

      This post is public

      www.ag-software.net /2023/03/22/xmppdotnet-announcement/

    • chevron_right

      Ignite Realtime Blog: Release v1.1.0 of the MUC Real-Time Block List plugin for Openfire

      news.movim.eu / PlanetJabber • 18 March, 2023 • 1 minute

    We are happy to announce the immediate availability of a new version of the MUC Real-Time Block List plugin for Openfire , our cross-platform real-time collaboration server based on the XMPP protocol! This plugin can help you moderate your chat rooms, especially when your service is part of a larger network of federated XMPP domains.

    From experience, the XMPP community has learned that bad actors tend to spam a wide range of public chat rooms on an equally wide range of different domains. Prior to the functionality provided by this plugin, the administrator of each MUC service had to manually adjust permissions, to keep unwanted entities out. With this new plugin, that process is automated.

    In this new release, several small bugs were fixed, and new features were introduced, including:

    • The plugin now, by default, uses a block list as maintained on https://xmppbl.org/
    • Support for blocking full domains (rather than just individual users) has been added
    • Block list entries no longer disappear over time

    The updated plugin should become available for download in your Openfire admin console in the course of the next few hours. Alternatively, you can download the plugin directly, from the plugin’s archive page .

    For other release announcements and news follow us on Twitter and Mastodon .

    1 post - 1 participant

    Read full topic

    • wifi_tethering open_in_new

      This post is public

      discourse.igniterealtime.org /t/release-v1-1-0-of-the-muc-real-time-block-list-plugin-for-openfire/92673