phone

    • chevron_right

      ProcessOne: ejabberd 23.01

      news.movim.eu / PlanetJabber • 17 January, 2023 • 5 minutes

    Two months after the previous release, ejabberd 23.01 includes many bug fixes, several improvements and some new features.

    A new module, mod_mqtt_bridge , can be used to replicate changes to MQTT topics between local and remote servers.

    A more detailed explanation of those topics and other features:

    Erlang/OTP 19.3 discouraged

    Remember that support for Erlang/OTP 19.3 is discouraged, and will be removed in a future release. Please upgrade to Erlang/OTP 20.0 or newer. Check more details in the ejabberd 22.10 release announcement .

    New MQTT bridge

    This new module allows synchronizing topic changes between local and remote servers. It can be configured to replicate local changes to remote server, or can subscribe to topics on remote server and update local copies when they change.

    When connecting to a remote server you can use native or websocket encapsulated protocol, and you can connect using both v4 and v5 protocol. It can authenticate using username/password pairs or with client TLS certificates.

    New Hooks

    Regarding MQTT support, there are several new hooks:

    • mqtt_publish : New hook for MQTT publish event
    • mqtt_subscribe and mqtt_unsubscribe : New hooks for MQTT subscribe & unsubscribe events

    New option log_modules_fully

    The loglevel top-level option specifies the verbosity of log files generated by ejabberd.

    If you want some specific modules to log everything, independently from whatever value you have configured in loglevel , now you can use the new log_modules_fully option.

    For example, if you are investigating some problem in ejabberd_sm and mod_client_state :

    loglevel: warning
    log_modules_fully: [ejabberd_sm, mod_client_state]
    

    (This option works only on systems with Erlang 22 or newer).

    Changes in option outgoing_s2s_families

    The outgoing_s2s_families top-level option specifies which address families to try, in what order.

    The default value has now been changed to try IPv6 first, as servers are within data centers where IPv6 is more commonly enabled (contrary to clients). And if it’s not present, then it’ll just fall back to IPv4.

    By the way, this option is obsolete and irrelevant when using ejabberd 23.01 and Erlang/OTP 22, or newer versions of them.

    Changes in option captcha_cmd

    The captcha_cmd top-level option specifies the full path to a script that can generate a CAPTCHA image. Now this option may specify an Erlang module name, which should implement a function to generate a CAPTCHA image.

    ejabberd does not include any such module, but there are two available in the ejabberd-contrib repository that you can install and try: mod_ecaptcha and mod_captcha_rust .

    DOAP file

    The protocols implemented or supported by ejabberd are defined in the corresponding source code modules since ejabberd 15.06. Until now, only the XEP number and supported version were tracked. Since now, it’s possible to document what ejabberd version first implemented it, the implementation status and an arbitrary comment.

    That information until now was only used by the script tools/check_xep_versions.sh . A new script is added, tools/generate-doap.sh , to generate a DOAP file with that information. A new tarket is added to Makefile: make doap .

    And that DOAP file is now published as ejabberd.doap in the git repository. That file is read by the XMPP.org website to show ejabberd’s protocols, see XMPP Servers: ejabberd .

    VSCode

    Support for Visual Studio Code and variants is vastly improved. Thanks to the Erlang LS VSCode extension , the ejabberd git repository includes support for developing, compiling and debugging ejabberd with Visual Studio Code, VSCodium, Coder’s code-server and Github Codespaces.

    See more details in the ejabberd Docs: VSCode page.

    ChangeLog

    General

    • Add misc:uri_parse/2 to allow declaring default ports for protocols
    • CAPTCHA: Add support to define module instead of path to script
    • Clustering: Handle mnesia_system_event mnesia_up when other node joins this ( #3842 )
    • ConverseJS: Don’t set i18n option because Converse enforces it instead of browser lang ( #3951 )
    • ConverseJS: Try to redirect access to files mod_conversejs to CDN when there is no local copies
    • ext_mod: compile C files and install them in ejabberd’s priv
    • ext_mod: Support to get module status from Elixir modules
    • make-binaries: reduce log output
    • make-binaries: Bump zlib version to 1.2.13
    • MUC: Don’t store mucsub presence events in offline storage
    • MUC: hibernation_time is not an option worth storing in room state ( #3946 )
    • Multicast: Jid format when multicastc was cached ( #3950 )
    • mysql: Pass ssl options to mysql driver
    • pgsql: Do not set standard_conforming_strings to off ( #3944 )
    • OAuth: Accept jid as a HTTP URL query argument
    • OAuth: Handle when client is not identified
    • PubSub: Expose the pubsub#type field in disco#info query to the node ( #3914 )
    • Translations: Update German translation

    Admin

    • api_permissions : Fix option crash when doesn’t have who: section
    • log_modules_fully : New option to list modules that will log everything
    • outgoing_s2s_families : Changed option’s default to IPv6, and fall back to IPv4
    • Fix bash completion when using Relive or other install methods
    • Fix portability issue with some shells ( #3970 )
    • Allow admin command to subscribe new users to members_only rooms
    • Use alternative split/2 function that works with Erlang/OTP as old as 19.3
    • Silent warning in OTP24 about not specified cacerts in SQL connections
    • Fix compilation warnings with Elixir 1.14

    DOAP

    • Support extended -protocol erlang attribute
    • Add extended RFCs and XEP details to some protocol attributes
    • tools/generate-doap.sh : New script to generate DOAP file, add make doap ( #3915 )
    • ejabberd.doap : New DOAP file describing ejabberd supported protocols

    MQTT

    • Add MQTT bridge module
    • Add support for certificate authentication in MQTT bridge
    • Implement reload in MQTT bridge
    • Add support for websockets to MQTT bridge
    • Recognize ws5/wss5 urls in MQTT bridge
    • mqtt_publish : New hook for MQTT publish event
    • mqtt_(un)subscribe : New hooks for MQTT subscribe & unsubscribe events

    VSCode

    • Improve .devcontainer to use use devcontainer image and .vscode
    • Add .vscode files to instruct VSCode how to run ejabberd
    • Add Erlang LS default configuration
    • Add Elvis default configuration

    Full Changelog

    https://github.com/processone/ejabberd/compare/22.10…23.01

    ejabberd 23.01 download & feedback

    As usual, the release is tagged in the Git source code repository on GitHub .

    The source package and installers are available in ejabberd Downloads page. To check the *.asc signature files, see How to verify ProcessOne downloads integrity .

    For convenience, there are alternative download locations like the ejabberd DEB/RPM Packages Repository and the GitHub Release / Tags .

    The Docker image is in Docker Hub , and there’s an alternative Container image in GitHub Packages .

    If you suspect that you’ve found a bug, please search or fill a bug report on GitHub Issues .

    The post ejabberd 23.01 first appeared on ProcessOne .
    • chevron_right

      Paul Schaub: Use Any SOP Binary With SOP-Java and External-SOP

      news.movim.eu / PlanetJabber • 13 January, 2023 • 1 minute

    The Stateless OpenPGP Protocol specification describes a shared, standardized command line interface for OpenPGP applications. There is a bunch of such binaries available already, among them PGPainless’ pgpainless-cli , Sequoia-PGP’s sqop , as well as ProtonMails gosop . These tools make it easy to use OpenPGP from the command line, as well as from within bash scripts (both pgpainless-cli and sqop are available in Debian testing or the main repo) and the standardized interface allows users to switch from one backend to the other without the need to rewrite their scripts.

    The Java library sop-java provides a set of interface definitions that define a java API that closely mimics the command line interface. These interfaces can be implemented by anyone, such that developers could create a drop-in for sop-java using the OpenPGP library of their choice. One such backend is pgpainless-sop , which implements sop-java using the PGPainless library.

    I just released another library named external-sop , which implements sop-java and allows the user to use any SOP CLI application of their choice from within their Java / Kotlin application!

    Let’s assume we have a SOP command line application called example-sop and we want to make use of it within our Java application. external-sop makes the integration a one-liner:

    SOP sop = new ExternalSOP("/usr/bin/example-sop");

    Now we can use the resulting sop object the same way we would use for example a SOPImpl instance:

    // generate key
    byte[] keyBytes = sop.generateKey()
            .userId("John Doe <john.doe@pgpainless.org>")
            .withKeyPassword("f00b4r")
            .generate()
            .getBytes();
    
    // extract certificate
    byte[] certificateBytes = sop.extractCert()
            .key(keyBytes)
            .getBytes();
    
    byte[] plaintext = "Hello, World!\n".getBytes(); // plaintext
    
    // encrypt and sign a message
    byte[] ciphertext = sop.encrypt()
            // encrypt for each recipient
            .withCert(certificateBytes)
            // Optionally: Sign the message
            .signWith(keyBytes)
            .withKeyPassword("f00b4r") // if signing key is protected
            // provide the plaintext
            .plaintext(plaintext)
            .getBytes();
    
    // decrypt and verify a message
    ByteArrayAndResult<DecryptionResult> bytesAndResult = sop.decrypt()
            .withKey(keyBytes)
            .verifyWithCert(certificateBytes)
            .withKeyPassword("f00b4r") // if decryption key is protected
            .ciphertext(ciphertext)
            .toByteArrayAndResult();
    
    DecryptionResult result = bytesAndResult.getResult();
    byte[] plaintext = bytesAndResult.getBytes();

    The external-sop module will be available on Maven Central in a few hours for you to test.

    Happy Hacking!

    • wifi_tethering open_in_new

      This post is public

      blog.jabberhead.tk /2023/01/13/use-any-sop-binary-with-sop-java-and-external-sop/

    • chevron_right

      Erlang Solutions: Building a Remote Control Car from Scratch Using Elixir

      news.movim.eu / PlanetJabber • 12 January, 2023 • 8 minutes

    Introduction

    Elixir is undoubtedly one of the most comprehensive full stack languages available, offering battle-tested reliability and fault-tolerance on the backend. This is thanks to its origins in Erlang, the BEAM VM and OTP, powerful and agile frontend development thanks to LiveView and the ability to write to hardware with Nerves (not to mention the exciting developments happening in the machine learning space).

    Our Americas office created a project that takes full advantage of that fullstack capability- a remote control car that can be controlled from your phone. It has all the components from the car to the app, controlled and written in Elixir.

    Here’s how they did it.

    Background

    During ElixirConf , we set a Scalextric racetrack at our sponsor booth where people meeting us were able to play around with the race cars. It’s a fun way to encourage people to come to the stand, but we felt that something was missing, there was no connection between the fun we had on the stand and the languages we love (Erlang and Elixir).

    So we thought it would be cool to assemble our own remote car using Elixir. We went ahead and got rid of the cables and the track, which were physical restrictions to the fun we envisioned.

    That’s how the idea was born.

    The initial implementation was for us to gain more knowledge about Nerves and IoT in general. Our approach was to assemble some RaspberryPi with a motor driver and see if we could control the car over WiFi.

    This is when we decided to start a very rough car prototype to see how easy it was to get the whole project running in Elixir.

    Requirements

    We wanted to ensure we only used Elixir / Erlang Ecosystem in our stack:

    • Erlang/OTP 25.1.2

    • Elixir 1.14.2

    You also need the Nerves bootstrap mix archive in order to create the project scaffold and provide deeper integration within mix.

    mix archive.install hex nerves_bootstrap

    Theory

    Let’s first recap some theory and concepts:

    The basic idea is for us to move a pair of wheels. In order to do that, we need a device that is capable of power and can control a couple of motors. We decided to use a L298n motor driver that is easily available in the local electronics stores.

    The L298n is a device that can power up to two motors and is able to control their speed by issuing PWM commands.

    L298N module pinout

    We powered the device using four AA rechargeable batteries that are connected via 12v and GND pins.

    We also needed to know that for moving the wheels, we had to write GPIO commands to IN1, IN2, IN3 and IN4, while controlling the speed via PWM over the pins ENA and ENB (motor A and B respectively).

    At the end of the day, we had this circuit implemented:

    Starting the project

    We started with a blank project and chassis:

    First, we start with a blank Nerves project that will give us the scaffold we need:

    export MIX_TARGET=rpi4
    mix nerves.new jaguar
    

    Before we compiled the project, we added a couple of dependencies that we needed:

    # ...
    {:vintage_net, "~> 0.12", targets: @all_targets},
    {:vintage_net_wifi, "~> 0.11.1", targets: @all_targets},
    {:pigpiox, "~> 0.1", targets: @all_targets},
    # ...
    

    The dependencies above helped us with WiFi connectivity and GPIO / PWM commands.

    2.2 First steps

    Now that we had all dependencies in place we can proceed to compile the project:

    mix do deps.get, compile

    We now needed to focus on how to make the wheels move. At first, you might have to do some scaffolding to test your setup:

    The motors themselves don’t have any polarity, so there is no risk of magic smoke. But keep this in mind in case your wheels spin backwards.

    Now, let’s connect the other components and give it a try.

    Tune in

    After having a working setup, we need to connect to the outside world. We provided a very naive and basic way to connect to the back-end via TCP. But first, we need to make sure our device can connect to the internet at startup.

    Nerves has a third-party library that deals with networking setup and also provides WiFi configuration utilities. There are different ways to set this up but for simplicity, we are going to set the configuration statically in your config/target.exs file:

    config :vintage_net,
      config: [			
    {"wlan0", %{
           type: VintageNetWiFi,
            vintage_net_wifi: %{
    
    					
             networks: [
                %{
    
    					
                 key_mgmt: :wpa_psk,
                  ssid: "my_network_ssid",
                  psk: "a_passphrase_or_psk",
    } ]
    					
    },
    					
           ipv4: %{method: :dhcp}
          }					
    } ] 
    

    For more information about the different configuration options and setup choices, refer to the documentation.

    Once your WiFi connectivity is configured, we need to make sure that we can connect to the back-end via TCP. To do so, just create a new GenServer that connects via :gen_tcp at initialization, pretty much like this:

    ## GenServer callbacks
    @impl true
    def init(opts) do
      backend = Keyword.get(opts, :backend, "localhost") |> to_charlist()
      port = Keyword.get(opts, :port, 5000)
      state = %{socket: nil, backend: backend, port: port}
      {:ok, state, {:continue, :connect}}
    End
    
    @impl true
    def handle_continue(:connect, state) do
      Logger.info("connecting to #{inspect(state.backend)}")
      {:ok, socket} = :gen_tcp.connect(state.backend, state.port, keepalive: true)
      _ref = Port.monitor(socket)
      {:noreply, %{state | socket: socket}}
    end 
    #...
    

    Powering the device

    There is not much to this as we use a standard MakerHawk Raspberry Pi UPS that fits right down the Raspberry Pi. It is powered by two rechargeable 18650 Li-PO batteries. This hat also works as a charging device.

    Moving the wheels

    Moving the wheels is a straightforward process. We only need to take into consideration the PIN layout that we are using for communicating with the motor driver. In this case we are using the following layout:

    IN1 – GPIO 24 (A forward)

    IN2 – GPIO 23 (A backward)

    IN3 – GPIO 17 (B forward)

    IN4 – GPIO 22 (B backward)

    ENA – GPIO 25 (A speed)

    ENB – GPIO 27 (B speed)

    The PIN numbers correspond to the pinout layout for Raspberry Pi 4 model B.

    With the pins wired, we can now issue some commands to prepare the pins for output and set the initial motor speed:

    Enum.map([in1, in2, in3, in4], fn pin ->
      Pigpiox.GPIO.set_mode(pin, :output)
      Pigpiox.GPIO.write(pin, 0) # Stop
    end)
    speed = 250
    Enum.map([ena, enb], fn pin ->
      Pigpiox.GPIO.set_mode(pin, :output)
      Pigpiox.Pwm.gpio_pwm(pin, speed)
    end)
    

    After setup, we can change the speed of the motors on the fly:

    speed = 200
    :ok = Pigpiox.Pwm.gpio_pwm(ena, speed)
    :ok = Pigpiox.Pwm.gpio_pwm(enb, speed)
    

    We can also control the motors to go forwards/backwards:

    # Forwards
    _ = Enum.map([in2, in4], &Pigpiox.GPIO.write(&1, 0))
    _ = Enum.map([in1, in3], &Pigpiox.GPIO.write(&1, 1))
    # Backwards
    _ = Enum.map([in1, in3], &Pigpiox.GPIO.write(&1, 0))
    _ = Enum.map([in2, in4], &Pigpiox.GPIO.write(&1, 1))
    

    wire everything up! The idea is that we used the TCP socket we opened for listening for Erlang binary terms that when decoded, will get translated into steering instructions, that we can then translate to GPIO commands.

    With the base logic drafted, we burned the firmware into the SD card and power up the device:

    MIX_TARGET=rpi4 mix do deps.get, compile, firmware, burn

    Next steps

    Moving on to the next part of the setup. We would need a way for sending commands to the car over the internet.

    In the firmware, we have a simple interface for translating steering commands into GPIO commands. We can export those facilities over our TCP socket:

    		
    		@impl true
    def handle_info({:tcp, _, data}, state) do
    msg = data
        |> :erlang.iolist_to_binary()
        |> :erlang.binary_to_term()
      case msg do
        {:speed, speed} ->
          Vehicle.speed(speed)
        steering when steering in @valid_moves ->
    Vehicle.direction(steering)
      end
      {:noreply, state}
    End
    			

    Keep in mind that we are using a naive approach at communicating with the back-end. A more robust mechanism would be needed if you plan to drive the car in a highway.

    3.1 The back-end

    The back-end is fairly easy and is left as an exercise to the reader. Our current implementation consists of a LiveView car controller, that consists of a couple of buttons for the steering and a slider for the speed. On user input, the LiveView process will encode the information to send it to the connected car via TCP:

    # ...
    def handle_event("speed", %{"value" => speed}, socket) do
      vehicle = socket.assigns.vehicle
      speed = String.to_integer(speed)
      Vehicles.speed(vehicle.pid, speed)
      {:noreply, assign(socket, speed: speed)}
    end
    def handle_event("stop", _params, socket) do
      vehicle = socket.assigns.vehicle
      Vehicles.stop(vehicle.pid)
      {:noreply, assign(socket, stopped: true)}
    end
    def handle_event(direction, _params, socket) do
      vehicle = socket.assigns.vehicle
      case direction do
        "left" -> Vehicles.left(vehicle.pid)
        "right" -> Vehicles.right(vehicle.pid)
        "forward" -> Vehicles.forward(vehicle.pid)
    "backwards" -> Vehicles.backwards(vehicle.pid)
      end
      {:noreply, socket}
    end
    # …
    

    4 Sources and conclusions

    We are now finished! Hopefully everything is put together and, you should have something that reassembles this:

    va

    We had fun assembling the kit and working with Nerves. It was easier than we expected, and we found that Nerves is a very stable and solid frame- work for deploying Elixir applications in restrained environments without friction.

    Now that we finished our first proof of concept, we are going to see if this idea can be scaled and enhanced. Stay tuned for more!

    All the source code is available under MIT licence under GitHub:

    • Jaguar-1 source code

    • Nerves project

    • Erlang Solutions

    Need help making the most of Elixir?

    You’re in the right place. We’ve worked with the world’s biggest companies to provide transformative, mission critical solutions across a range of industries including Fintech, Health Care and Utilities providers. Learn more about our Elixir services here. Or if you’re a developer looking to take your skills to the next level check out our training page.

    The post Building a Remote Control Car from Scratch Using Elixir appeared first on Erlang Solutions .

    • wifi_tethering open_in_new

      This post is public

      www.erlang-solutions.com /blog/building-a-remote-control-car-from-scratch-using-elixir/

    • chevron_right

      Ignite Realtime Blog: Help us translate Spark and Openfire!

      news.movim.eu / PlanetJabber • 24 December, 2022

    We have started to experiment with an online tool that facilitates the process of translating Spark and Openfire. Both already have a bunch of translations, but none are complete.

    I’m looking for people wanting to test the tool and/or provide translations. The aim is to make providing translations become so easy that little technological know-how is required.

    If you’re interested, please sign up to Ignite Realtime localization | Transifex and let me know what you think!

    1 post - 1 participant

    Read full topic

    • chevron_right

      Ignite Realtime Blog: New Mastodon service for the Ignite Realtime community!

      news.movim.eu / PlanetJabber • 21 December, 2022 • 1 minute

    Some of you might already have followed along with the discussion on this in the open_chat chatroom, but: the Ignite Realtime community now has its own Mastodon service at toot.igniterealtime.org ! This service is graciously sponsored by Free Solutions Sàrl - a big thank you to Claude and his team!

    Mastodon logo

    The idea is to have a Mastodon service with accounts from like-minded people with regards to open source / open standards real time collaboration. That way, both our local as well as federated timelines should become more applicable to us, as the users of this service, as compared to one of the larger, generic servers that are out there. Also, decentralizing by moving away from some of those gigantic services is a Good Thing©®!

    We are inviting our community members to join toot.igniterealtime.org ! If you don’t have a Mastodon account yet, or if you want an additional one, or want to migrate your existing account, please join us!

    At least for now, this server is not accepting public sign-ups. While we are gaining experience with running a Mastodon service, we will limit new accounts to people from the community that we recognize. This will largely be based on the trust levels that software running our forum is assigning to people.

    When you sign up on our Mastodon service , please use the same mail address that you used to sign up to our forum, so that we can cross-reference who’s who. It helps if you fill out your forum username in the answer to the “Why do you want to join?” question that’s part of the application. The approval process is manual, so please allow for some time for that to happen. If you think that we’ve missed your request (Mastodon doesn’t always send out notifications, it appears), please let us know by reaching out in the forum, or in the open_chat chatroom !

    We are looking forward to hearing from you in the Fediverse!

    1 post - 1 participant

    Read full topic

    • wifi_tethering open_in_new

      This post is public

      discourse.igniterealtime.org /t/new-mastodon-service-for-the-ignite-realtime-community/92389

    • chevron_right

      JMP: Newsletter: Busy Year in 2022

      news.movim.eu / PlanetJabber • 19 December, 2022 • 4 minutes

    Hi everyone!

    Welcome to the latest edition of your pseudo-monthly JMP update!

    In case it’s been a while since you checked out JMP, here’s a refresher: JMP lets you send and receive text and picture messages (and calls) through a real phone number right from your computer, tablet, phone, or anything else that has a Jabber client.  Among other things, JMP has these features: Your phone number on every device; Multiple phone numbers, one app; Free as in Freedom; Share one number with multiple people.

    Cheogram Android 2.11.0-1 has been released, including an important fix for creating new private group chats.  For some months creating such a group (a Jabber group, not a “group text”) with Cheogram Android has resulted in a public channel on many servers.  Please double-check your private groups and change settings if necessary!  This release will also be the first accepted into F-Droid with an up-to-date version of libwebrtc, so if you’ve had any issues with calls and use the F-Droid build, we recommend upgrading and trying again.  This release also adds support for tagging channels and group chats (on supporting servers, such as Snikket), better use of locales to determine what country code to prepend when dialling, a new OLED black theme, and more.

    The data plan roll out continues, accelerating in December but we know there are still many of you waiting.  Thank you so much for your patience, and to all the feedback we have received from users so far.  We are actively working on making the signup process self-serve so that the waitlist will no longer be necessary in the future.

    When JMP started we were just one part-time person.  As we grow, the legal structures that fit that time no longer do.  This fall we incorporated the MBOA Technology Co-operative to house JMP, Togethr , consulting work , and other activity.  This gives all our employees full agency in the company and gives us a firm legal footing for the future.  Nothing changes for you at this time, we’re still the same team, and for the time being you don’t even change the name you write on the cheques, nevertheless it marks a milestone in our life as a company.

    Year in Review

    This year, JMP and Snikket CIC made a deal to offer Jabber hosting as an option for JMP customers. This service is included in the regular JMP subscription and will eventually be the default option for new users during the sign-up process. JMP customers have been able to participate in a beta version of this integration, and JMP customers can contact JMP support to set up a Snikket instance directly.

    This year also saw international calling added to our list of features. JMP users are able to use as many minutes per month as they like, with approximately 120 minutes of credit to USA and Canada included by default. Customers are able to pay for additional minutes and make international calls, although users who are still paying with the old PayPal system will not have access to these features (or other features such as the data plan). We also implemented a per-calendar-month overage limit system, where customers can set their own limits to avoid unexpected charges. The default limit is currently set at $0/month.

    One of our most popular features has always been our voicemail and transcription, this year we expanded that to support multi-lingual transcriptions as well.

    We also added multi-account billing this year, an alpha for JMP use from Matrix, added two employees, created new bot commands for account management, launched Togethr to help people take control of their social media idenity, added support for SMS-only ports and the option to disable voicemail, built an XMPP integration for Chatwoot , and launched the JMP data plan .

    This year saw the launch and rapid development of the Cheogram Android app , forked from Conversations and including these and other improvements:

    • Add contacts without typing @cheogram.com
    • Integrate with the native Android Phone app (optional)
    • Address book integration (optional)
    • Option to start group texts easily
    • Command UI for better interactions with our and other bots (you can even sign up entirely from within the app !)
    • Rich text message display (including stickers from Movim users)
    • Data de-duplication for files sent/received multiple times
    • Message retraction
    • Ability to edit tags on contacts and channels
    • Tag navigation widget for easier conversation management
    • Ability to copy any link in a message to the clipboard
    • F-Droid repositories for quick updates of official builds

    Blog posts this year included: How to use Jabber from SMS , Why Bidirectional Gateways Matter , Computing International Call Rates with a Trie , Privacy and Threat Modelling , SMS Account Verification , and Writing a Chat Client from Scratch .

    To learn what’s happening with JMP between newsletters, here are some ways you can find out:

    Thanks for reading and have a wonderful rest of your week!

    • wifi_tethering open_in_new

      This post is public

      blog.jmp.chat /b/december-newsletter-2022

    • chevron_right

      Erlang Solutions: GraphQL interfaces in MongooseIM 6.0

      news.movim.eu / PlanetJabber • 19 December, 2022 • 9 minutes

    MongooseIM is a robust, scalable and highly extensible instant messaging server. Recent releases have improved its configurability and opened new use cases, and the latest version 6.0 continues that trend. By introducing the brand new GraphQL API, we made MongooseIM much easier to integrate with external web services. The entry barrier is also lower than ever because of the automatically generated API documentation, interactive web UI, and the new Command Line Interface (CLI), that can execute predefined GraphQL operations for you, providing help when necessary. The latest changes are exciting, but to fully understand and appreciate them, let’s start with summarizing the state of the API and CLI in the previous version 5.1.

    MongooseIM 5.1: API and CLI before the changes

    The primary interface exposed by MongooseIM is the c2s (client-to-server) listener using XMPP – an open, battle-proven, well-adopted and extensible protocol with an active community behind it. While we believe that XMPP is an excellent choice for an interface to which end user devices connect, there are important use cases when other interfaces might be useful:

    1. An administrator can use the Command Line Interface ( CLI ) to manage the server and its extensions.
    2. A user device or a consumer-facing Web UI can use the REST API to perform operations that don’t require an XMPP connection.
    3. An administrative web service can use the REST API to manage the server and its extensions.

    The diagram below shows the architecture of the interfaces that allow such operations:

    The REST API is pictured on the left. The top block contains a list of HTTP handlers that need to be configured in the TOML configuration file, mongooseim.tom l .

    There are two handler types here:

    • Admin API : mongoose_api_admin, mongoose_api, mongoose_domain_handler
    • User (client) API : mongoose_client_api, mongoose_api_client

    The module names can be confusing. This is because the API was developed in a few iterations over the years, and the design patterns have changed a few times. By default, the Client and Admin APIs are enabled for different HTTP listeners. This setup is recommended because the administrative API should be protected from being accessed by regular users. Some of them, like mongoose_client_api , contain several command categories that can be enabled individually. The handler modules are responsible for executing these commands, and they are doing so by either directly calling internal modules, or by executing commands registered in the REST command registry. The latter is more organized, but command registration is quite complicated. As a result, only some handler modules are doing so (as shown in the diagram). To register the appropriate commands in the REST command registry, you would configure the respective extensions modules, e.g. mod_commands contains most of the commands, but if you wanted to manage chat rooms, you’d need to enable mod_muc_light_commands .

    The CLI is shown on the right side of the diagram. The command handler is a module that gets called by Erlang RPC, which executes the commands registered in the CLI commands registry. To have them registered, you need to configure specific command categories in service_admin_extra .

    It is evident from the diagram that REST API logic is completely disjoint from the CLI one. Command registries are separate, and although they both use the Erlang ETS tables to store the commands, they have very different command formats. As a result, if the same command is available in both interfaces, it would need to be implemented twice. Code reuse is possible, but not enforced, so most often the logic is not shared. The configuration is also very different for both.

    Finally, and most importantly, many commands are implemented only for the Client REST API, only for the Admin REST API, or only for the CLI. We addressed all of these issues in the new release, so let’s see what the improved architecture looks like.

    MongooseIM 6.0: Introducing GraphQL

    The architecture of the new GraphQL interfaces is shown in the following diagram.

    The GraphQL API is handled by mongoose_graphql_handler , which is configured for an HTTP listener , and can be enabled for three different endpoints: Admin, Domain Admin and User.

    Let’s see how each of them is configured by default:

    The password-protected Admin endpoint is listening at 127.0.0.1:5551.

    [[listen.http]]
      ip_address = "127.0.0.1"
      port = 5551
    
      [[listen.http.handlers.mongoose_graphql_handler]]
    	host = "localhost"
    	path = "/api/graphql"
    	schema_endpoint = "admin"
    	username = "admin"
    	password = "secret"
    

    The Domain Admin is a new level between Admin and User, and is meant to be used by administrators of single XMPP domains. Domain credentials are configured by the global administrator with the Admin interface, and the domain administrator has to provide them when accessing the Domain Admin interface.

    [[listen.http]]
      ip_address = "0.0.0.0"
      port = 5541
    
      [[listen.http.handlers.mongoose_graphql_handler]]
    	host = "_"
    	path = "/api/graphql"
    	schema_endpoint = "domain_admin"
    

    Finally, the User endpoint requires providing the credentials of a specific XMPP user, and allows user-specific commands to be executed.

    [[listen.http]]
      ip_address = "0.0.0.0"
      port = 5561
    
      [[listen.http.handlers.mongoose_graphql_handler]]
    	host = "_"
    	path = "/api/graphql"
    	schema_endpoint = "user"
    

    The handlers are calling the GraphQL executor, which performs operations declared statically in two schemas: Admin and User. The Domain Admin uses the Admin schema, but there are special @protected directives which guarantee that a domain administrator can only execute these operations for their own domain. The commands are implemented as three different GraphQL operation types:

    • Queries for requesting information from the server, e.g. a user can request archived messages.
    • Mutations for performing an action on the server, e.g. a user can send a message.
    • Subscriptions for requesting a stream of updates from the server, e.g. a user can subscribe for incoming messages.

    Operation logic is defined in the resolver modules, which in turn call the internal API modules to execute the logic. This way there can be no ad-hoc calls to arbitrary internal logic in the resolver modules. There is no command registry required anymore, and a special @use directive in the schema ensures that modules and services required by each executed command are enabled.

    The GraphQL-based CLI handler module exposes the GraphQL commands from the Admin schema. Each command uses an automatically generated GraphQL query. This means that the same administrative commands are present in the HTTP GraphQL API and in the CLI. The old CLI is deprecated, and will be removed soon. The REST API will be still available for some time, but it will be phased out as well. In version 6.0, it was reworked to use the internal API modules, and it no longer requires the REST command registry. Thanks to the static schema, we could use SpectaQL to automatically generate the documentation for the Admin and User APIs. Another feature is the GraphiQL UI, which lets you experiment with the API in your browser. You can use browser plugins such as Altair as well.

    CLI and API in numbers

    All the operations offered by the old CLI and API are now available in the new GraphQL interfaces. We also added many new commands:

    Legacy CLI Legacy REST API GraphQL API and CLI
    Admin commands 56 32 114
    User commands 16 55

    Starting with the Admin (and Domain Admin) interfaces, we can compare the functionality offered by the legacy CLI, legacy REST API and the new GraphQL API. The diagram below shows the number of commands offered by each interface:

    For example, the domain category was unavailable with the legacy CLI, and the REST API offered four commands allowing you to add, remove, enable and disable dynamic XMPP domains. The new GraphQL commands offer them all, but also add two new ones, responsible for setting and removing domain-admin passwords. For some categories, like muc and muc_light (both implementing multi-user chat) we offer many more commands than before, allowing you to configure and use MongooseIM in new ways. The comparison looks similar for the User API:

    CLI and API in action

    The command line interface is the easiest one to use. To start, you only need to call the mongooseimctl command.

    Let’s assume that we need to add a new user, but we don’t know how to do so:

    $ mongooseimctl
    Usage: mongooseimctl [category] command [arguments]
    
    Most MongooseIM commands are grouped into the following categories:
      account     Account management 
      domain      Dynamic domain management 
      gdpr        Personal data management according to GDPR 
      httpUpload  Generating upload/download URLs for the files 
      inbox       Inbox bin flushing 
      last        Last activity management 
      metric      Browse metrics 
      mnesia      Mnesia internal database management 
      muc         MUC room management 
      muc_light   MUC Light room management 
      offline     Deleting old Offline messages 
      private     User private storage management 
      roster      User roster/contacts management 
      server      Server info and management 
      session     User session management 
      stanza      Sending stanzas and querying MAM 
      stat        Server statistics 
      token       OAUTH user token management 
      vcard       vCard management 
    
    To list the commands in a particular category:
      mongooseimctl category
    
    (...)
    

    The account category is the one we are interested in.

    $ mongooseimctl account
    Usage: mongooseimctl account command arguments
    
    The following commands are available in the category 'account':
      banUser             Ban an account: kick sessions and set a random password 
      changeUserPassword  Change the password of a user 
      checkPassword       Check if a password is correct 
      checkPasswordHash   Check if a password hash is correct (allowed methods: md5, sha)
      checkUser           Check if a user exists 
      countUsers          Get number of users per domain 
      listUsers           List users per domain 
      registerUser        Register a user. Username will be generated when skipped 
      removeUser          Remove the user's account along with all the associated personal data 
    
    To list the arguments for a particular command:
      mongooseimctl account command --help
    

    Now we know that the command is called registerUser .

    $ mongooseimctl account registerUser --help
    Usage: mongooseimctl account registerUser arguments
    
    Each argument has the format: --name value
    Available arguments are listed below with the corresponding GraphQL types:
      domain    DomainName! 
      password  String! 
      username  UserName 
    
    Scalar values do not need quoting unless they contain special characters or spaces.
    Complex input types are passed as JSON maps or lists, depending on the type.
    When a type is followed by '!', the corresponding argument is required.
    

    The arguments are listed with their GraphQL types. Both DomainName and UserName are essentially strings, but there are more complex types as well. To learn more about a particular type, use our online documentation .

    Knowing the arguments, you can create the user now:

    $ mongooseimctl account registerUser --username alice --domain localhost --password mysecret
    {
      "data" : {
        "account" : {
          "registerUser" : {
            "message" : "User alice@localhost successfully registered",
            "jid" : "alice@localhost"
          }
        }
      }
    }
    

    Now, let’s explore one more interface – the web UI provided by GraphiQL . Assuming that you have the GraphQL handlers set up with the default configuration, you can open http://localhost:5551/api/graphql in your browser, and enter the following query:

    You need to provide the authorization header as well. This one uses the default credentials, which you should of course change in your production environment. On the right you can see the results including the newly created user. You can use the Docs tab to learn more about a particular command.

    More than just the new API

    We have barely scratched the surface of the new features available in MongooseIM 6.0. Apart from the ones presented, these are some of the other improvements you can see in our latest release:

    • Dynamic domain removal is now asynchronous and incremental, which is useful if there is a lot of data to clean up.
    • Better pagination support for Inbox .
    • Internal rework of hooks and handlers.
    • Various improvements and fixes – see the Release Notes for details. We merged almost 200 pull requests since the last release.

    We are now working on more exciting features, so stay tuned, because we will have more news soon.

    The post GraphQL interfaces in MongooseIM 6.0 appeared first on Erlang Solutions .

    • chevron_right

      Erlang Solutions: Change data capture with Postgres & Elixir

      news.movim.eu / PlanetJabber • 13 December, 2022 • 19 minutes

    CDC is the process of identifying and capturing data changes from the database.

    With CDC, changes to data can be tracked in near real-time, and that information can be used to support a variety of use cases, including auditing, replication, and synchronisation.


    A good example of a use case for CDC is to consider an application which inserts a record into the database and pushes an event to a message queue after the record has been inserted (write-twice).

    Imagine you’re working on an e-commerce application and after an order is created and inserted into the database, you push an OrderCreated event to a message queue. The consumers of the event might do things such as create pick orders for the warehouse, schedule transports for delivery and send an order confirmation email to the customer.

    But what if the application crashes after the order has been inserted into the database but before managing to push the event to the message queue? This is possible due to the fact that you can’t atomically insert the record AND push the message in the same transaction, so if the application crashes after inserting the record to the database but before pushing the event to the queue, the event is lost.

    There are of course workarounds to circumvent this: a simple solution is to “outbox” the event into an outbox table in the same transaction as writing the record, andhen, rely on a CDC process to capture the change to the outbox table and push the event to the message queue. The transaction is atomic and the CDC process can assure the event

    gets delivered to the message queue at-least-once.

    In order to capture changes, CDC typically uses one of two methods: log-based or trigger-based.

    Log-based CDC involves reading the transaction logs of the database to identify changed data, which is the method we’ll use here by utilising Postgres Logical Replication.

    Postgres replication

    There are two modes of replication in Postgres:

    1. Physical replication – Every change from the primary are streamed to replicas via the WAL (Write Ahead Log). This replication is performed byte-by-byte with exact block addresses.

    2. Logical replication – In logical replication the subscriber receives each individual transactional change (i.e. INSERT, UPDATE, or DELETE statements) to the database.

    The WAL is still streamed but it encodes the logical operations so that they can be decoded by the subscriber without having to know Postgres internals.

    One of the great things about logical replication is that it can be used to only replicate specific tables or rows, meaning that you have complete control over what is being replicated.

    To enable logical replication the `wal_level` need to be set:

    ```sql
    -- determines how much information is written to the wal. 
    -- Each 'level' inherits the level below it; 'logical' is the highest level
    
    ALTER SYSTEM SET wal_level=logical;
    
    -- simultaneously running WAL sender processes
    ALTER SYSTEM SET max_wal_senders='10';
    
    -- simultaneously defined replication slots
    ALTER SYSTEM SET max_replication_slots='10';
    ```
    

    The changes require a restart to the Postgres instance.

    After the system has been restarted the `wal_level` can be verified with:

    ```sql
    SHOW wal_level;
     wal_level 
    -----------
     logical
    (1 row)
    

    In order to subscribe to changes a [publication](https://www.postgresql.org/docs/current/sql-createpublication.html) must be created. A publication is a group of tables in which we would like to receive data changes for.

    Let’s create a simple table and define a publication for it:

    ```sql
    CREATE TABLE articles (id serial PRIMARY KEY, title text, description text, body text);
    CREATE PUBLICATION articles_pub FOR TABLE articles;
    ```
    

    To tell postgres to retain WAL segments we must create a [replication slot](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html).

    The replication slot represents a stream of changes from one or more publications and is used to prevent data loss in the event of a server failure, as they are crash safe.


    Replication Protocol

    In order to get a feel for the protocol and messages being sent we can use [`pg_recvlogical`](https://www.postgresql.org/docs/current/app-pgrecvlogical.html) to start a replication subscriber:

    ```sh
    # Start and use the publication defined above
    # output is written to stdout
    pg_recvlogical --start \
      --host='localhost' \
      --port='5432' \
      --username='postgres' \
      --dbname='postgres' \
      --option=publication_names='articles_pub' \
      --option=proto_version=1 \
      --create-slot \
      --if-not-exists \
      --slot=articles_slot \
      --plugin=pgoutput \
      --file=-
    ```
    

    Insert a record:

    ```sql
    INSERT INTO articles (title, description, body)
        VALUES ('Postgres replication', 'Using logical replication', 'Foo bar baz');
    ```
    

    Each row in the output corresponds to a replication messages received through the subscription:

    ```
    B(egin) - Begin transaction 
    R(elation) - Table, schema, columns and their types
    I(insert) - Data being inserted
    C(ommit) - Commit transaction
    ```
    
    
    
    ```
    B
    
    Rarticlesdidtitledescriptionbody
    It35tPostgres replicationtUsing logical replicationtFoo bar baz
    C
    ```
    

    If we insert multiple records in a transaction we should have two I in between B and C:

    ```sql
    BEGIN;
    INSERT INTO articles (title, description, body) VALUES ('First', 'desc', 'Foo');
    
    INSERT INTO articles (title, description, body) VALUES ('Second', 'desc', 'Bar');
    COMMIT;
    ```
    
    And the ouput:
    ```
    C
    B
    
    It37tFirsttdesctFoo
    It38tSecondtdesctBar
    C
    ```
    
    

    The relation i.e table information was not transmitted since we already received the relation when inserting the first record.

    Postgres only sends the relation the first time it’s encountered during the session. The subscriber is expected to cache a previously sent relation.

    Now that we have a feel for how Logical replication works, let’s implement it in Elixir!

    Implementing the replication connection

    Create a new Elixir project:

    ```
    mix new cdc
    ```
    
    We'll add the following dependencies to `mix.exs`
    
    ```elixir
    defp deps do
      {:postgrex, "~> 0.16.4"},
      # decode/encode replication messages
      {:postgrex_pgoutput, "~> 0.1.0"}
    end
    ```
    
    
    `postgrex` supports replication through the [`Postgrex.ReplicationConnection`](https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html) process.
    
    
    ```elixir
    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
      require Logger
    
      defstruct [
        :publications,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok, %__MODULE__{slot: slot, publications: pubs}}
      end
    
      
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result([%Postgrex.Result{} | _], %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        Logger.debug("Received msg=#{inspect(msg, limit: :infinity, pretty: true)}")
        {:noreply, [], state}
      end
    
      defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    ```
    

    The code is available on GitHub

    Let’s try it out:

    ```elixir
    opts = [
      slot: "articles_slot_elixir",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    CDC.Replication.start_link(opts)
    ```
    

    When we start the process the following is happening:

    1. Once we are connected to postgres the callback `handle_connect/1` is called, a temporary logical replication slot is created.

    2. `handle_result/2` is called with the result from the query in ‘1’. If the slot was created successfully we start streaming from the slot and go into streaming mode. The requested position ‘0/0’ means that Postgres picks the position.

    3. Any replication messages sent from postgres are received in the `handle_data/2` callback.

    Replication messages

    There are two types of messages a subscriber receives:

    1. `primary_keep_alive` – A checkin message, if `reply == 1` the subscriber is expected to reply to the message with a `standy_status_update` to avoid a timeout disconnect.

    The `standy_status_update` contains the current [LSN](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) the subscriber has processed.

    Postgres uses this message to determine which WAL segments can be safely removed.

    2. `xlog_data` – Contains the data messages for each step in a transaction.

    Since we are not responding to the `primary_keep_alive` messages the process gets disconnected and restarts
    .

    Let’s fix it by decoding the messages and start replying with `standby_status_update` messages.

    ```elixir
    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      require Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.{Messages, Lsn}
    
      require Logger
    
      defstruct [
        :publications,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok, %__MODULE__{slot: slot, publications: pubs}}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        return_msgs =
          msg
          |> Messages.decode()
          |> handle_msg()
    
        {:noreply, return_msgs, state}
      end
    
      #
      defp handle_msg(Messages.msg_primary_keep_alive(server_wal: lsn, reply: 1)) do
        Logger.debug("msg_primary_keep_alive message reply=true")
        <<lsn::64>> = Lsn.encode(lsn)
    
        [standby_status_update(lsn)]
      end
    
      defp handle_msg(Messages.msg_primary_keep_alive(reply: 0)), do: []
    
      defp handle_msg(Messages.msg_xlog_data(data: data)) do
        Logger.debug("xlog_data message: #{inspect(data, pretty: true)}")
        []
      end
    
      defp standby_status_update(lsn) do
        [
          wal_recv: lsn + 1,
          wal_flush: lsn + 1,
          wal_apply: lsn + 1,
          system_clock: Messages.now(),
          reply: 0
        ]
        |> Messages.msg_standby_status_update()
        |> Messages.encode()
      end
    
      
    defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    ```
    


    `handle_data/2` decodes the message and passes it to `handle_msg/1`.  If it’s a `primary_keep_alive` we respond with a `standby_status_update`.

    The LSN denotes a byte position in the WAL.

    The subscriber responds with the LSN it has currently handled, since we are not tracking the messages we receive, we just ack with the LSN sent from the server.

    Next we’ll handle `xlog_data` messages, the idea here is that we’ll capture each operation into a Transaction struct.

    Capturing transactions

    The `CDC.Protocol` module will handle `xlog_data` messages and track the state of the transaction.

    ```elixir
    defmodule CDC.Protocol do
      import Postgrex.PgOutput.Messages
      require Logger
    
      alias CDC.Tx
      alias Postgrex.PgOutput.Lsn
    
      
    @type t :: %__MODULE__{
              tx: Tx.t(),
              relations: map()
            }
    
      defstruct [
        :tx,
        relations: %{}
      ]
    
      @spec new() :: t()
      def new do
        %__MODULE__{}
      end
    
      def handle_message(msg, state) when is_binary(msg) do
        msg
        |> decode()
        |> handle_message(state)
      end
    
      def handle_message(msg_primary_keep_alive(reply: 0), state), do: {[], nil, state}
      def handle_message(msg_primary_keep_alive(server_wal: lsn, reply: 1), state) do
        Logger.debug("msg_primary_keep_alive message reply=true")
        <<lsn::64>> = Lsn.encode(lsn)
    
        {[standby_status_update(lsn)], nil, state}
      end
    
      def handle_message(msg, %__MODULE__{tx: nil, relations: relations} = state) do
        tx =
          [relations: relations, decode: true]
          |> Tx.new()
          |> Tx.build(msg)
    
        {[], nil, %{state | tx: tx}}
      end
    
      def handle_message(msg, %__MODULE__{tx: tx} = state) do
        case Tx.build(tx, msg) do
          %Tx{state: :commit, relations: relations} ->
            tx = Tx.finalize(tx)
            relations = Map.merge(state.relations, relations)
            {[], tx, %{state | tx: nil, relations: relations}}
    
          tx ->
            {[], nil, %{state | tx: tx}}
        end
      end
    
      defp standby_status_update(lsn) do
        [
          wal_recv: lsn + 1,
          wal_flush: lsn + 1,
          wal_apply: lsn + 1,
          system_clock: now(),
          reply: 0
        ]
        |> msg_standby_status_update()
        |> encode()
      end
    end
    ```
    `CDC.Tx` handles messages received within a transaction, begin, relation, insert/update/delete and commit.
    
    
    ```elixir
    defmodule CDC.Tx do
      import Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.Lsn
    
      alias __MODULE__.Operation
    
      @type t :: %__MODULE__{
              operations: [Operation.t()],
              relations: map(),
              timestamp: term(),
              xid: pos_integer(),
              state: :begin | :commit,
              lsn: Lsn.t(),
              end_lsn: Lsn.t()
            }
    
      defstruct [
        :timestamp,
        :xid,
        :lsn,
        :end_lsn,
        relations: %{},
        operations: [],
        state: :begin,
        decode: true
      ]
    
      def new(opts \\ []) do
        struct(__MODULE__, opts)
      end
    
      def finalize(%__MODULE__{state: :commit, operations: ops} = tx) do
        %{tx | operations: Enum.reverse(ops)}
      end
    
      def finalize(%__MODULE__{} = tx), do: tx
    
      @spec build(t(), tuple()) :: t()
      def build(tx, msg_xlog_data(data: data)) do
        build(tx, data)
      end
    
      def build(tx, msg_begin(lsn: lsn, timestamp: ts, xid: xid)) do
        %{tx | lsn: lsn, timestamp: ts, xid: xid, state: :begin}
      end
    
      def build(%__MODULE__{state: :begin, relations: relations} = tx, msg_relation(id: id) = rel) do
        %{tx | relations: Map.put(relations, id, rel)}
      end
    
      def build(%__MODULE__{state: :begin, lsn: tx_lsn} = tx, msg_commit(lsn: lsn, end_lsn: end_lsn))
          when tx_lsn == lsn do
        %{tx | state: :commit, end_lsn: end_lsn}
      end
    
      def build(%__MODULE__{state: :begin} = builder, msg_insert(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      def build(%__MODULE__{state: :begin} = builder, msg_update(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      def build(%__MODULE__{state: :begin} = builder, msg_delete(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      # skip unknown messages
      def build(%__MODULE__{} = tx, _msg), do: tx
    
      defp build_op(%__MODULE__{state: :begin, relations: rels, decode: decode} = tx, id, msg) do
        rel = Map.fetch!(rels, id)
        op = Operation.from_msg(msg, rel, decode)
    
        %{tx | operations: [op | tx.operations]}
      end
    end
    ```
    
    
    `CDC.Tx.Operation` handles INSERT/UPDATE/DELETE messages and decodes the data by combining it with the relation 
    
    
    ```elixir
    defmodule CDC.Tx.Operation do
      @moduledoc "Describes a change (INSERT, UPDATE, DELETE) within a transaction."
    
      import Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.Type, as: PgType
    
      @type t :: %__MODULE__{}
      defstruct [
        :type,
        :schema,
        :namespace,
        :table,
        :record,
        :old_record,
        :timestamp
      ]
    
      @spec from_msg(tuple(), tuple(), decode :: boolean()) :: t()
      def from_msg(
            msg_insert(data: data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :insert,
          namespace: ns,
          schema: into_schema(columns),
          table: name,
          record: cast(data, columns, decode?),
          old_record: %{}
        }
      end
    
      def from_msg(
            msg_update(change_data: data, old_data: old_data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :update,
          namespace: ns,
          table: name,
          schema: into_schema(columns),
          record: cast(data, columns, decode?),
          old_record: cast(columns, old_data, decode?)
        }
      end
    
      def from_msg(
            msg_delete(old_data: data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :delete,
          namespace: ns,
          schema: into_schema(columns),
          table: name,
          record: %{},
          old_record: cast(data, columns, decode?)
        }
      end
    
      defp into_schema(columns) do
        for c <- columns do
          c
          |> column()
          |> Enum.into(%{})
        end
      end
    
      defp cast(data, columns, decode?) do
        Enum.zip_reduce([data, columns], %{}, fn [text, typeinfo], acc ->
          key = column(typeinfo, :name)
    
          value =
            if decode? do
              t =
                typeinfo
                |> column(:type)
                |> PgType.type_info()
    
              PgType.decode(text, t)
            else
              text
            end
    
          Map.put(acc, key, value)
        end)
      end
    end
    ```
    

    As before, the `primary_keep_alive` message with `reply == 1` sends a `standby_status_update`. When we receive an `xlog_data` message, we create a new `%Tx{}` which we use to “build” the transaction until we receive a `msg_commit` which marks the end of the transaction.

    Any insert, update, delete messages creates an `CDC.Tx.Operation` in the transaction, each operation contains a `relation_id` which is used to look up the relation from `tx.relations`.

    The operation together with the relation enables us to decode the data. Column and type information is retrieved from the relation and is used to decode the values into elixir terms.
    .

    Once we are in a `commit` state we merge `Tx.relations` with `Protocol.relations` since a relation message will only be transmitted the first time a table is encountered during the connection session, `Protocol.relations` contains all `msg_relation` we’ve been sent during the session.

    The `CDC.Replication` module now looks like this:

    ```elixir
    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      alias CDC.Protocol
    
      require Logger
    
      defstruct [
        :publications,
        :protocol,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok,
         %__MODULE__{
           slot: slot,
           publications: pubs,
           protocol: Protocol.new()
         }}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
    
        if not is_nil(tx) do
          Logger.debug("Tx: #{inspect(tx, pretty: true)}")
        end
    
        {:noreply, return_msgs, %{state | protocol: protocol}}
      end
    
      
    defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    ```
    

    `handle_data/2` calls `Protocol.handle_message/1` which returns a tuple with three elements `{messages_to_send :: [binary()], complete_transaction :: CDC.Tx.t() | nil, CDC.Protocol.t()}`

    For now we just inspect the transaction when it’s emitted from `Protocol.handle_message/3`, let’s try it out:

    ```elixir
    Interactive Elixir (1.14.0) - press Ctrl+C to exit (type h() ENTER for help)
    opts = [
      slot: "articles_slot_elixir",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    {:ok, _} = CDC.Replication.start_link(opts)
    {:ok, pid} = Postgrex.start_link(opts)
    
    insert_query = """
    INSERT INTO articles (title, description, body) 
    VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
    """
    
    _ = Postgrex.query!(pid, insert_query, [])
      
    14:03:48.020 [debug] Tx: %CDC.Tx{
      timestamp: ~U[2022-10-31 13:03:48Z],
      xid: 494,
      lsn: {0, 22981920},
      end_lsn: nil,
      relations: %{
        16386 => {:msg_relation, 16386, "public", "articles", :default,
         [
           {:column, [:key], "id", :int4, -1},
           {:column, [], "title", :text, -1},
           {:column, [], "description", :text, -1},
           {:column, [], "body", :text, -1}
         ]}
      },
      operations: [
        %CDC.Tx.Operation{
          type: :insert,
          schema: [
            %{flags: [:key], modifier: -1, name: "id", type: :int4},
            %{flags: [], modifier: -1, name: "title", type: :text},
            %{flags: [], modifier: -1, name: "description", type: :text},
            %{flags: [], modifier: -1, name: "body", type: :text}
          ],
          namespace: "public",
          table: "articles",
          record: %{
            "body" => "with Elixir!",
            "description" => "Using logical replication",
            "id" => 6,
            "title" => "Postgres replication"
          },
          old_record: %{},
          timestamp: nil
        }
      ],
      state: :begin,
      decode: true
    }
    ```
    

    Each change in the transaction is stored in `Tx.operations`, `operation.record` is the decoded row as a map.
    Finally let’s implement a way to subscribe to changes from `CDC.Replication`:

    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      alias CDC.Protocol
    
      require Logger
    
      defstruct [
        :publications,
        :protocol,
        :slot,
        :state,
        subscribers: %{}
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      def subscribe(pid, opts \\ []) do
        Postgrex.ReplicationConnection.call(pid, :subscribe, Keyword.get(opts, :timeout, 5_000))
      end
    
      def unsubscribe(pid, ref, opts \\ []) do
        Postgrex.ReplicationConnection.call(
          pid,
          {:unsubscribe, ref},
          Keyword.get(opts, :timeout, 5_000)
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok,
         %__MODULE__{
           slot: slot,
           publications: pubs,
           protocol: Protocol.new()
         }}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
    
        if not is_nil(tx) do
          notify(tx, state.subscribers)
        end
    
        {:noreply, return_msgs, %{state | protocol: protocol}}
      end
    
      # Replies must be sent using `reply/2`
      # https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#reply/2
      @impl true
      def handle_call(:subscribe, {pid, _} = from, state) do
        ref = Process.monitor(pid)
    
        state = put_in(state.subscribers[ref], pid)
    
        Postgrex.ReplicationConnection.reply(from, {:ok, ref})
    
        {:noreply, state}
      end
    
      def handle_call({:unsubscribe, ref}, from, state) do
        {reply, new_state} =
          case state.subscribers do
            %{^ref => _pid} ->
              Process.demonitor(ref, [:flush])
    
              {_, state} = pop_in(state.subscribers[ref])
              {:ok, state}
    
            _ ->
              {:error, state}
          end
    
        from && Postgrex.ReplicationConnection.reply(from, reply)
    
        {:noreply, new_state}
      end
    
      @impl true
      def handle_info({:DOWN, ref, :process, _, _}, state) do
        handle_call({:unsubscribe, ref}, nil, state)
      end
    
      defp notify(tx, subscribers) do
        for {ref, pid} <- subscribers do
          send(pid, {:notification, self(), ref, tx})
        end
    
        :ok
      end
    
      defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    

    And we can use it like this:

    ```elixir
    opts = [
      slot: "articles_slot",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    {:ok, pid} = CDC.Replication.start_link(opts)
    {:ok, pg_pid} = Postgrex.start_link(opts)
    {:ok, ref} = CDC.Replication.subscribe(pid)
    
    insert_query = """
    INSERT INTO articles (title, description, body) 
    VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
    """
    
    _ = Postgrex.query!(pg_pid, insert_query, [])
    flush()
    
    {:notification, #PID<0.266.0>, #Reference<0.2499916608.3416784901.94813>,
     %CDC.Tx{
       timestamp: ~U[2022-10-31 13:26:35Z],
       xid: 495,
       lsn: {0, 22983536},
       end_lsn: nil,
       relations: %{
         16386 => {:msg_relation, 16386, "public", "articles", :default,
          [
            {:column, [:key], "id", :int4, -1},
            {:column, [], "title", :text, -1},
            {:column, [], "description", :text, -1},
            {:column, [], "body", :text, -1}
          ]}
       },
       operations: [
         %CDC.Tx.Operation{
           type: :insert,
           schema: [
             %{flags: [:key], modifier: -1, name: "id", type: :int4},
             %{flags: [], modifier: -1, name: "title", type: :text},
             %{flags: [], modifier: -1, name: "description", type: :text},
             %{flags: [], modifier: -1, name: "body", type: :text}
           ],
           namespace: "public",
           table: "articles",
           record: %{
             "body" => "with Elixir!",
             "description" => "Using logical replication",
             "id" => 7,
             "title" => "Postgres replication"
           },
           old_record: %{},
           timestamp: nil
         }
       ],
       state: :begin,
       decode: true
     }}
    ```
    

    Conclusion

    If you’re looking for a way to capture changes from your database with minimal changes to your existing setup, Changing Data Capture is definitely worth considering. With Elixir and postgrex we’ve implemented a mini Debezium in ~400 LOC. Full source is available here( https://github.com/drowzy/postgrex_pgoutput/tree/main/examples/cdc).

    If you need help with your Elixir implementation our world-leading team of experts is always here to help. Contact us today to find out how we can help you.

    The post Change data capture with Postgres & Elixir appeared first on Erlang Solutions .

    • wifi_tethering open_in_new

      This post is public

      www.erlang-solutions.com /blog/change-data-capture-with-postgres-elixir/

    • chevron_right

      Ignite Realtime Blog: Spark 3.0.1 Released

      news.movim.eu / PlanetJabber • 12 December, 2022 • 1 minute

    The Ignite Realtime community is happy to announce the release of Spark 3.0.1 version.

    This release contains mostly fixes. macOS now uses the default FlatLaf LaF. The user can also choose the type of tabs “scroll” as in Spark 3.0.0 or “wrap” as in Spark 2.X. See screenshot below. And also for some users, Spark not saved history.

    spark3.0.1 (2)

    To do this, go to File → Preferences → Chat
    image

    Full list of changes can be found in the changelog .

    We encourage users and developers to get involved with Spark project by providing feedback in the forums or submitting pull requests on our GitHub page.

    You can download Spark from the Downloads page. Below are the sha256 checksums:

    55b5efaaaa59e661d7e94b0f4168d37d383cd521c8a954a36fa7943339e197f6 *spark_3_0_1-64bit.exe
    5a6c2a10df14d1892216de188e1c2558ebd5f05ff4529f00fcb65ce30f2d4bce *spark_3_0_1-64bit.msi
    172b6fca86b43c370a7de1c7e2c05d6581341e656474b7bea868f5927804efb8 *spark_3_0_1-arm.exe
    b837ce77016e2a438e1dd9f2ef2d7752985b777be8dd4152296d7e48fc285fbb *spark_3_0_1-with-jre.dmg
    bf9ba305aaf5e763eca5fc8332c73b5c155b49e03a28c5352777aa577bf66a41 *spark_3_0_1-with-jre.exe
    a496956254bd65a87f65a266cf50e4b6c6ad71a371565ba91dc1e236cee39b8c *spark_3_0_1-with-jre.msi
    02001b7c17780c7aeb6d37f66efe898d291043fbbc201bb958f8af9b3b9abf52 *spark_3_0_1.deb
    7aa635154a4d34c401e871641626e7db3e48938d48f62f64d023c77d10fc1e89 *spark_3_0_1.dmg
    41ce2b95c0e43808359943f899a34054a72b570efd1183ff41848b79e26f2f38 *spark_3_0_1.exe
    5afdc4b1ab3ae6b77349b9d3e86003179af6b47b960535544843dd8542fd37f0 *spark_3_0_1.msi
    1e0f51db2d836ef3041ce354a7c7bbeec3b220781e8750cf1e027ad5ecf50cbc *spark_3_0_1.rpm
    ca35cb357f2e928db638f5eac2066f364b5c4af23bd558df1e6c18ae3854e6b7 *spark_3_0_1.sh
    ace373ad59d8fb928d6841a61ac06353a05c4374d9d30df86b875b0e77e9bbc4 *spark_3_0_1.tar.gz
    

    For other release announcements and news follow us on Twitter

    1 post - 1 participant

    Read full topic