La captura de datos es el proceso de identificar y capturar cambios de datos en la base de datos.
Con captura de datos, los cambios en los datos pueden ser rastreados casi en tiempo real, y esa información puede ser utilizada para apoyar una variedad de casos de uso, incluyendo auditoría, replicación y sincronización.
Un buen ejemplo de un caso de uso para captura de datos es considerar una aplicación que inserta un registro en la base de datos y envía un evento a una cola de mensajes después de que se ha insertado el registro (escribir dos veces).
Imagina que estás trabajando en una aplicación de comercio electrónico y después de que se crea y se inserta un pedido en la base de datos, se envía un evento OrderCreated a una cola de mensajes. Los consumidores del evento podrían hacer cosas como crear órdenes de recolección para el almacén, programar transportes para la entrega y enviar un correo electrónico de confirmación del pedido al cliente.
Pero ¿qué sucede si la aplicación se bloquea después de que se ha insertado el pedido en la base de datos pero antes de lograr enviar el evento a la cola de mensajes? Esto es posible debido al hecho de que no se puede insertar atómicamente el registro Y enviar el mensaje en la misma transacción, por lo que si la aplicación se bloquea después de insertar el registro en la base de datos pero antes de enviar el evento a la cola, se pierde el evento.
Por supuesto, existen soluciones alternativas para evitar esto: una solución simple es “almacenar” el evento en una tabla de almacenamiento temporal en la misma transacción en la que se escribe el registro, y luego depender de un proceso captura de datos para capturar el cambio en la tabla de almacenamiento y enviar el evento a la cola de mensajes. La transacción es atómica y el proceso de captura de datos puede asegurar que el evento se entregue al menos una vez.
Para capturar cambios, la captura de datos típicamente utiliza uno de dos métodos: basado en registro o basado en disparadores.
La captura de datos basado en registro implica leer los registros de transacciones de la base de datos para identificar los cambios de datos, que es el método que utilizaremos aquí al utilizar la replicación lógica de Postgres.
Replicación de Postgres
Hay dos modos de replicación en Postgres:
-
Replicación física: cada cambio del primario se transmite a las réplicas a través del WAL (Write Ahead Log). Esta replicación se realiza byte por byte con direcciones de bloque exactas.
-
Replicación lógica: en la replicación lógica, el suscriptor recibe cada cambio de transacción individual (es decir, declaraciones INSERT, UPDATE o DELETE) en la base de datos.
El WAL todavía se transmite, pero codifica las operaciones lógicas para que puedan ser decodificadas por el suscriptor sin tener que conocer los detalles internos de Postgres.
Una de las grandes ventajas de la replicación lógica es que se puede utilizar para replicar sólo tablas o filas específicas, lo que significa que se tiene un control completo sobre lo que se está replicando.
Para habilitar la replicación lógica, el wal_level debe ser configurado:
-- determines how much information is written to the wal.
-- Each 'level' inherits the level below it; 'logical' is the highest level
ALTER SYSTEM SET wal_level=logical;
-- simultaneously running WAL sender processes
ALTER SYSTEM SET max_wal_senders='10';
-- simultaneously defined replication slots
ALTER SYSTEM SET max_replication_slots='10';
Los cambios requieren un reinicio de la instancia de Postgres.
Después de reiniciar el sistema, el wal_level se puede verificar con:
SHOW wal_level;
wal_level
-----------
logical
(1 row)
Para suscribirse a los cambios se debe crear una
publicación
. Una publicación es un grupo de tablas en las que nos gustaría recibir cambios de datos.
Vamos a crear una tabla simple y definir una publicación para ella:
CREATE TABLE articles (id serial PRIMARY KEY, title text, description text, body text);
CREATE PUBLICATION articles_pub FOR TABLE articles;
Para indicar a Postgres que retenga segmentos de WAL, debemos crear un slot de replicación.
El slot de replicación representa un flujo de cambios desde una o más publicaciones y se utiliza para prevenir la pérdida de datos en caso de una falla del servidor, ya que son a prueba de fallos.
Protocolo de Replicación
Para tener una idea del protocolo y los mensajes que se envían, podemos usar
pg_recvlogical
para iniciar un suscriptor de replicación:
# Start and use the publication defined above
# output is written to stdout
pg_recvlogical --start \
--host='localhost' \
--port='5432' \
--username='postgres' \
--dbname='postgres' \
--option=publication_names='articles_pub' \
--option=proto_version=1 \
--create-slot \
--if-not-exists \
--slot=articles_slot \
--plugin=pgoutput \
--file=-
Insertar un registro:
INSERT INTO articles (title, description, body)
VALUES ('Postgres replication', 'Using logical replication', 'Foo bar baz');
Cada linea en la salida corresponde a un mensaje de replicación recibido a través de suscripción:
B(egin) - Begin transaction
R(elation) - Table, schema, columns and their types
I(insert) - Data being inserted
C(ommit) - Commit transaction
___________________________________
B
Rarticlesdidtitledescriptionbody
It35tPostgres replicationtUsing logical replicationtFoo bar baz
C
Si insertamos múltiples registros en una transacción deberíamos tener dos I entre B y C:
BEGIN;
INSERT INTO articles (title, description, body) VALUES ('First', 'desc', 'Foo');
INSERT INTO articles (title, description, body) VALUES ('Second', 'desc', 'Bar');
COMMIT;
Y la salida:
C
B
It37tFirsttdesctFoo
It38tSecondtdesctBar
CCopied to clipboard!
La información de la relación, es decir, la tabla, no se transmitió porque ya se recibió la relación al insertar el primer registro.
Postgres solo envía la relación la primera vez que se encuentra durante la sesión. Se espera que el suscriptor almacene en caché una relación previamente enviada.
Ahora que tenemos una idea de cómo funciona la replicación lógica, ¡implementémosla en Elixir!
Implementando la conexión de replicación
Cree un nuevo proyecto de Elixir:
mix new cdc
Añadiremos las siguientes dependencias a mix.exs:
defp deps do
{:postgrex, "~> 0.16.4"},
# decode/encode replication messages
{:postgrex_pgoutput, "~> 0.1.0"}
end
Postgrex admite la replicación a través del proceso
Postgrex.ReplicationConnection.
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
require Logger
defstruct [
:publications,
:slot,
:state
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
@impl true
def init({slot, pubs}) do
{:ok, %__MODULE__{slot: slot, publications: pubs}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result([%Postgrex.Result{} | _], %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
Logger.debug("Received msg=#{inspect(msg, limit: :infinity, pretty: true)}")
{:noreply, [], state}
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
El código esta disponible en
GitHub
Probemos:
opts = [
slot: "articles_slot_elixir",
publications: ["articles_pub"],
host: "localhost",
database: "postgres",
username: "postgres",
password: "postgres",
port: 5432,
]
CDC.Replication.start_link(opts)
Cuando iniciamos el proceso, ocurre lo siguiente:
-
Una vez que estamos conectados a Postgres, se llama al callback handle_connect/1, se crea un slot de replicación lógica temporal.
-
Se llama a handle_result/2 con el resultado de la consulta en el paso 1. Si el slot se creó correctamente, comenzamos a transmitir desde el slot y entramos en el modo de transmisión. La posición solicitada ‘0/0’ significa que Postgres elige la posición.
-
Cualquier mensaje de replicación enviado desde Postgres se recibe en el callback handle_data/2.
Mensajes de replicación
Hay dos tipos de mensajes que un suscriptor recibe:
-
primary_keep_alive: un mensaje de comprobación, si reply == 1 se espera que el suscriptor responda al mensaje con un standby_status_update para evitar una desconexión por tiempo de espera.
El standby_status_update contiene el LSN actual que el suscriptor ha procesado.
Postgres utiliza este mensaje para determinar qué segmentos de WAL se pueden eliminar de forma segura.
-
xlog_data: contiene los mensajes de datos para cada paso en una transacción.Dado que no estamos respondiendo a los mensajes primary_keep_alive, el proceso se desconecta y se reinicia.
Arreglemos esto decodificando los mensajes y comenzando a responder con mensajes standby_status_update.
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
require Postgrex.PgOutput.Messages
alias Postgrex.PgOutput.{Messages, Lsn}
require Logger
defstruct [
:publications,
:slot,
:state
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
@impl true
def init({slot, pubs}) do
{:ok, %__MODULE__{slot: slot, publications: pubs}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result(
[%Postgrex.Result{} | _],
%__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
return_msgs =
msg
|> Messages.decode()
|> handle_msg()
{:noreply, return_msgs, state}
end
#
defp handle_msg(Messages.msg_primary_keep_alive(server_wal: lsn, reply: 1)) do
Logger.debug("msg_primary_keep_alive message reply=true")
<<lsn::64>> = Lsn.encode(lsn)
[standby_status_update(lsn)]
end
defp handle_msg(Messages.msg_primary_keep_alive(reply: 0)), do: []
defp handle_msg(Messages.msg_xlog_data(data: data)) do
Logger.debug("xlog_data message: #{inspect(data, pretty: true)}")
[]
end
defp standby_status_update(lsn) do
[
wal_recv: lsn + 1,
wal_flush: lsn + 1,
wal_apply: lsn + 1,
system_clock: Messages.now(),
reply: 0
]
|> Messages.msg_standby_status_update()
|> Messages.encode()
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
handle_data/2
decodifica el mensaje y lo pasa a handle_msg/1. Si es un
primary_keep_alive
, respondemos con un
standby_status_update.
El LSN denota una posición de byte en el WAL.
El suscriptor responde con el LSN que ha manejado actualmente, como no estamos haciendo seguimiento de los mensajes que recibimos, simplemente confirmamos con el LSN enviado desde el servidor.
A continuación, manejaremos los mensajes xlog_data, la idea aquí es que capturaremos cada operación en una estructura de transacción.
Capturando transacciones
El módulo CDC.Protocol manejará los mensajes xlog_data y rastreará el estado de la transacción
defmodule CDC.Protocol do
import Postgrex.PgOutput.Messages
require Logger
alias CDC.Tx
alias Postgrex.PgOutput.Lsn
@type t :: %__MODULE__{
tx: Tx.t(),
relations: map()
}
defstruct [
:tx,
relations: %{}
]
@spec new() :: t()
def new do
%__MODULE__{}
end
def handle_message(msg, state) when is_binary(msg) do
msg
|> decode()
|> handle_message(state)
end
def handle_message(msg_primary_keep_alive(reply: 0), state), do: {[], nil, state}
def handle_message(msg_primary_keep_alive(server_wal: lsn, reply: 1), state) do
Logger.debug("msg_primary_keep_alive message reply=true")
<<lsn::64>> = Lsn.encode(lsn)
{[standby_status_update(lsn)], nil, state}
end
def handle_message(msg, %__MODULE__{tx: nil, relations: relations} = state) do
tx =
[relations: relations, decode: true]
|> Tx.new()
|> Tx.build(msg)
{[], nil, %{state | tx: tx}}
end
def handle_message(msg, %__MODULE__{tx: tx} = state) do
case Tx.build(tx, msg) do
%Tx{state: :commit, relations: relations} ->
tx = Tx.finalize(tx)
relations = Map.merge(state.relations, relations)
{[], tx, %{state | tx: nil, relations: relations}}
tx ->
{[], nil, %{state | tx: tx}}
end
end
defp standby_status_update(lsn) do
[
wal_recv: lsn + 1,
wal_flush: lsn + 1,
wal_apply: lsn + 1,
system_clock: now(),
reply: 0
]
|> msg_standby_status_update()
|> encode()
end
end
CDC.Tx maneja mensajes recibidos dentro de la transacción, begin, relation, insert/update/delete y commit.
defmodule CDC.Tx do
import Postgrex.PgOutput.Messages
alias Postgrex.PgOutput.Lsn
alias __MODULE__.Operation
@type t :: %__MODULE__{
operations: [Operation.t()],
relations: map(),
timestamp: term(),
xid: pos_integer(),
state: :begin | :commit,
lsn: Lsn.t(),
end_lsn: Lsn.t()
}
defstruct [
:timestamp,
:xid,
:lsn,
:end_lsn,
relations: %{},
operations: [],
state: :begin,
decode: true
]
def new(opts \\ []) do
struct(__MODULE__, opts)
end
def finalize(%__MODULE__{state: :commit, operations: ops} = tx) do
%{tx | operations: Enum.reverse(ops)}
end
def finalize(%__MODULE__{} = tx), do: tx
@spec build(t(), tuple()) :: t()
def build(tx, msg_xlog_data(data: data)) do
build(tx, data)
end
def build(tx, msg_begin(lsn: lsn, timestamp: ts, xid: xid)) do
%{tx | lsn: lsn, timestamp: ts, xid: xid, state: :begin}
end
def build(%__MODULE__{state: :begin, relations: relations} = tx, msg_relation(id: id) = rel) do
%{tx | relations: Map.put(relations, id, rel)}
end
def build(%__MODULE__{state: :begin, lsn: tx_lsn} = tx, msg_commit(lsn: lsn, end_lsn: end_lsn))
when tx_lsn == lsn do
%{tx | state: :commit, end_lsn: end_lsn}
end
def build(%__MODULE__{state: :begin} = builder, msg_insert(relation_id: id) = msg),
do: build_op(builder, id, msg)
def build(%__MODULE__{state: :begin} = builder, msg_update(relation_id: id) = msg),
do: build_op(builder, id, msg)
def build(%__MODULE__{state: :begin} = builder, msg_delete(relation_id: id) = msg),
do: build_op(builder, id, msg)
# skip unknown messages
def build(%__MODULE__{} = tx, _msg), do: tx
defp build_op(%__MODULE__{state: :begin, relations: rels, decode: decode} = tx, id, msg) do
rel = Map.fetch!(rels, id)
op = Operation.from_msg(msg, rel, decode)
%{tx | operations: [op | tx.operations]}
end
end
CDC.Tx.Operation maneja los mensajes INSERT/UPDATE/DELETE y decodifica los datos combinándolos con la relación
defmodule CDC.Tx.Operation do
@moduledoc "Describes a change (INSERT, UPDATE, DELETE) within a transaction."
import Postgrex.PgOutput.Messages
alias Postgrex.PgOutput.Type, as: PgType
@type t :: %__MODULE__{}
defstruct [
:type,
:schema,
:namespace,
:table,
:record,
:old_record,
:timestamp
]
@spec from_msg(tuple(), tuple(), decode :: boolean()) :: t()
def from_msg(
msg_insert(data: data),
msg_relation(columns: columns, namespace: ns, name: name),
decode?
) do
%__MODULE__{
type: :insert,
namespace: ns,
schema: into_schema(columns),
table: name,
record: cast(data, columns, decode?),
old_record: %{}
}
end
def from_msg(
msg_update(change_data: data, old_data: old_data),
msg_relation(columns: columns, namespace: ns, name: name),
decode?
) do
%__MODULE__{
type: :update,
namespace: ns,
table: name,
schema: into_schema(columns),
record: cast(data, columns, decode?),
old_record: cast(columns, old_data, decode?)
}
end
def from_msg(
msg_delete(old_data: data),
msg_relation(columns: columns, namespace: ns, name: name),
decode?
) do
%__MODULE__{
type: :delete,
namespace: ns,
schema: into_schema(columns),
table: name,
record: %{},
old_record: cast(data, columns, decode?)
}
end
defp into_schema(columns) do
for c <- columns do
c
|> column()
|> Enum.into(%{})
end
end
defp cast(data, columns, decode?) do
Enum.zip_reduce([data, columns], %{}, fn [text, typeinfo], acc ->
key = column(typeinfo, :name)
value =
if decode? do
t =
typeinfo
|> column(:type)
|> PgType.type_info()
PgType.decode(text, t)
else
text
end
Map.put(acc, key, value)
end)
end
end
Como antes, el mensaje primary_keep_alive con reply == 1 envía un standby_status_update. Cuando recibimos un mensaje xlog_data, creamos un nuevo %Tx{} que usamos para “construir” la transacción hasta que recibimos un msg_commit que marca el final de la transacción.
Cualquier mensaje de inserción, actualización o eliminación crea una CDC.Tx.Operation en la transacción, cada operación contiene un relation_id que se utiliza para buscar la relación desde tx.relations.
La operación junto con la relación nos permite decodificar los datos. La información de columna y tipo se recupera de la relación y se utiliza para decodificar los valores en términos de Elixir.
Una vez que estamos en un estado de commit, fusionamos Tx.relations con Protocol.relations, ya que un mensaje de relación sólo se transmitirá la primera vez que se encuentre una tabla durante la sesión de conexión, Protocol.relations contiene todos los msg_relation que se nos han enviado durante la sesión.
El módulo CDC.Replication ahora se ve así:
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
alias CDC.Protocol
require Logger
defstruct [
:publications,
:protocol,
:slot,
:state
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
@impl true
def init({slot, pubs}) do
{:ok,
%__MODULE__{
slot: slot,
publications: pubs,
protocol: Protocol.new()
}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result(
[%Postgrex.Result{} | _],
%__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
{return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
if not is_nil(tx) do
Logger.debug("Tx: #{inspect(tx, pretty: true)}")
end
{:noreply, return_msgs, %{state | protocol: protocol}}
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
handle_data/2 llama a Protocol.handle_message/1 que devuelve una tupla con tres elementos {messages_to_send :: [binary()], complete_transaction :: CDC.Tx.t() | nil, CDC.Protocolo.t()}
Por ahora solo inspeccionamos la transacción cuando se emite desde Protocol.handle_message/3, probémoslo:
Interactive Elixir (1.14.0) - press Ctrl+C to exit (type h() ENTER for help)
opts = [
slot: "articles_slot_elixir",
publications: ["articles_pub"],
host: "localhost",
database: "postgres",
username: "postgres",
password: "postgres",
port: 5432,
]
{:ok, _} = CDC.Replication.start_link(opts)
{:ok, pid} = Postgrex.start_link(opts)
insert_query = """
INSERT INTO articles (title, description, body)
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""
_ = Postgrex.query!(pid, insert_query, [])
14:03:48.020 [debug] Tx: %CDC.Tx{
timestamp: ~U[2022-10-31 13:03:48Z],
xid: 494,
lsn: {0, 22981920},
end_lsn: nil,
relations: %{
16386 => {:msg_relation, 16386, "public", "articles", :default,
[
{:column, [:key], "id", :int4, -1},
{:column, [], "title", :text, -1},
{:column, [], "description", :text, -1},
{:column, [], "body", :text, -1}
]}
},
operations: [
%CDC.Tx.Operation{
type: :insert,
schema: [
%{flags: [:key], modifier: -1, name: "id", type: :int4},
%{flags: [], modifier: -1, name: "title", type: :text},
%{flags: [], modifier: -1, name: "description", type: :text},
%{flags: [], modifier: -1, name: "body", type: :text}
],
namespace: "public",
table: "articles",
record: %{
"body" => "with Elixir!",
"description" => "Using logical replication",
"id" => 6,
"title" => "Postgres replication"
},
old_record: %{},
timestamp: nil
}
],
state: :begin,
decode: true
}
Cada cambio en la transacción se almacena en Tx.operations, operation.record es la fila decodificada como un mapa.
Finalmente, implementemos una forma de suscribirnos a los cambios de CDC.Replication:
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
alias CDC.Protocol
require Logger
defstruct [
:publications,
:protocol,
:slot,
:state,
subscribers: %{}
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
def subscribe(pid, opts \\ []) do
Postgrex.ReplicationConnection.call(pid, :subscribe, Keyword.get(opts, :timeout, 5_000))
end
def unsubscribe(pid, ref, opts \\ []) do
Postgrex.ReplicationConnection.call(
pid,
{:unsubscribe, ref},
Keyword.get(opts, :timeout, 5_000)
)
end
@impl true
def init({slot, pubs}) do
{:ok,
%__MODULE__{
slot: slot,
publications: pubs,
protocol: Protocol.new()
}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result(
[%Postgrex.Result{} | _],
%__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
{return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
if not is_nil(tx) do
notify(tx, state.subscribers)
end
{:noreply, return_msgs, %{state | protocol: protocol}}
end
# Replies must be sent using `reply/2`
# https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#reply/2
@impl true
def handle_call(:subscribe, {pid, _} = from, state) do
ref = Process.monitor(pid)
state = put_in(state.subscribers[ref], pid)
Postgrex.ReplicationConnection.reply(from, {:ok, ref})
{:noreply, state}
end
def handle_call({:unsubscribe, ref}, from, state) do
{reply, new_state} =
case state.subscribers do
%{^ref => _pid} ->
Process.demonitor(ref, [:flush])
{_, state} = pop_in(state.subscribers[ref])
{:ok, state}
_ ->
{:error, state}
end
from && Postgrex.ReplicationConnection.reply(from, reply)
{:noreply, new_state}
end
@impl true
def handle_info({:DOWN, ref, :process, _, _}, state) do
handle_call({:unsubscribe, ref}, nil, state)
end
defp notify(tx, subscribers) do
for {ref, pid} <- subscribers do
send(pid, {:notification, self(), ref, tx})
end
:ok
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
Y lo podemos usar así:
opts = [
slot: "articles_slot",
publications: ["articles_pub"],
host: "localhost",
database: "postgres",
username: "postgres",
password: "postgres",
port: 5432,
]
{:ok, pid} = CDC.Replication.start_link(opts)
{:ok, pg_pid} = Postgrex.start_link(opts)
{:ok, ref} = CDC.Replication.subscribe(pid)
insert_query = """
INSERT INTO articles (title, description, body)
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""
_ = Postgrex.query!(pg_pid, insert_query, [])
flush()
{:notification, #PID<0.266.0>, #Reference<0.2499916608.3416784901.94813>,
%CDC.Tx{
timestamp: ~U[2022-10-31 13:26:35Z],
xid: 495,
lsn: {0, 22983536},
end_lsn: nil,
relations: %{
16386 => {:msg_relation, 16386, "public", "articles", :default,
[
{:column, [:key], "id", :int4, -1},
{:column, [], "title", :text, -1},
{:column, [], "description", :text, -1},
{:column, [], "body", :text, -1}
]}
},
operations: [
%CDC.Tx.Operation{
type: :insert,
schema: [
%{flags: [:key], modifier: -1, name: "id", type: :int4},
%{flags: [], modifier: -1, name: "title", type: :text},
%{flags: [], modifier: -1, name: "description", type: :text},
%{flags: [], modifier: -1, name: "body", type: :text}
],
namespace: "public",
table: "articles",
record: %{
"body" => "with Elixir!",
"description" => "Using logical replication",
"id" => 7,
"title" => "Postgres replication"
},
old_record: %{},
timestamp: nil
}
],
state: :begin,
decode: true
}}
Conclusión
Si está buscando una manera de capturar cambios de su base de datos con cambios mínimos en su configuración existente, definitivamente vale la pena considerar Cambiar la captura de datos. Con Elixir y postgrex hemos implementado un mini Debezium en ~400 LOC. La fuente completa está disponible
aquí
.
Si necesita ayuda con la implementación de Elixir, nuestro equipo de expertos líder en el mundo siempre está aquí para ayudarlo.
Contáctenos
hoy para saber cómo podemos ayudarlo.
The post
Captura de datos con Postgres y Elixir
appeared first on
Erlang Solutions
.