Powered by AI
2025-01-17
Podcast episode: How I got started as a developer & in Postgres with Postgres committer Daniel Gustafsson (cross post from r/postgresql)
March 5th 2005 at 3 PM in Copenhagen. That’s the exact time and place Daniel Gustafsson’s career took an unexpected turn, pivoting from operating systems to databases. At LinuxForum that day, Danie...
Logical replication in Postgres: Basics
In this post we'll explore the basics of logical replication between two Postgres databases as both a user and a developer.BackgroundPostgres first implemented physical replication where it shipped bytes on disk from one database A to another database B. Database B would write those bytes right back to disk. But this physical replication limits you to replicating between Postgres instances running the same Postgres version and on the same CPU architecture (and likely other operating system settings like page size).Logical replication replicates specific messages (e.g. insert, update, delete, truncate) over the network in a mostly stable format. This allows us to replicate data between databases running different Postgres versions. You can also filter rows or columns when replicating. Swanky!The drawback is that, since logical replication is newer than physical replication, it is still maturing. You cannot yet, for example, get DDL changes over logical replication.In any case, I have never set up basic logical replication in Postgres before so that's what I'm going to do in this post. :)SetupFirst build Postgres 17 from source. (This is important because later we'll apply some changes to help see how logical replication works.)$ git clone https://github.com/postgres/postgres $ cd postgres $ git checkout REL_17_STABLE $ ./configure --with-libxml --without-icu --prefix=$(pwd)/build --libdir=$(pwd)/build/lib --enable-cassert --enable-debug $ make -j8 $ make installNext create two Postgres instances.$ ./build/bin/initdb testdb1 $ ./build/bin/initdb testdb2Give them both a unique port to run on, and set the wal_level to logical so we can use logical replication.$ printf '\nport = 6001\nwal_level = logical' >> testdb1/postgresql.conf $ printf '\nport = 6002\nwal_level = logical' >> testdb2/postgresql.confThen start both databases.$ ./build/bin/pg_ctl -D testdb1 -l logfile1 start $ ./build/bin/pg_ctl -D testdb2 -l logfile2 startSetting up logical replicationWe'll just do something very simple. Create a single table and insert a couple rows into it in testdb1. We want those rows to end up in the same table in testdb2.We cannot replicate DDL with logical replication so we'll have to run the CREATE TABLE statement first on both databases.$ psql -h localhost -p 6001 postgres -c "CREATE TABLE users (name TEXT PRIMARY KEY, age INT);" CREATE TABLE $ psql -h localhost -p 6002 postgres -c "CREATE TABLE users (name TEXT PRIMARY KEY, age INT);" CREATE TABLENow we need to tell testdb1 (at port 6001) to publish changes to this table. We call the publication pub.$ psql -h localhost -p 6001 postgres -c "CREATE PUBLICATION pub FOR TABLE users;" CREATE PUBLICATIONAnd we need to tell testdb2 (at port 6002) to subscribe to changes from testdb1. We need to tell it we're referring to the pub publication created above on testdb1.$ psql -h localhost -p 6002 postgres -c "CREATE SUBSCRIPTION sub CONNECTION 'port=6001 dbname=postgres' PUBLICATION pub;" NOTICE: created replication slot "sub" on publisher CREATE SUBSCRIPTIONThe table on both databases is empty.$ psql -h localhost -p 6001 postgres -c "SELECT * FROM users;" name | age -------+----- (0 rows) $ psql -h localhost -p 6002 postgres -c "SELECT * FROM users;" name | age -------+----- (0 rows)Now let's insert two rows in testdb1.$ psql -h localhost -p 6001 postgres -c "INSERT INTO users VALUES ('deb', 12);" INSERT 0 1 $ psql -h localhost -p 6001 postgres -c "INSERT INTO users VALUES ('kevin', 13);" INSERT 0 1And now let's query testdb2.$ psql -h localhost -p 6002 postgres -c "SELECT * FROM users;" name | age -------+----- deb | 12 kevin | 13 (2 rows)Very nice!Now let's take a tiny peek under the hood.ArchitecturePostgres's process model makes it pretty easy to see what is going on. Let's look at the process tree.$ pstree -s postgres -+= 00001 root /sbin/launchd |-+= 87091 phil /Users/phil/edb/postgres17/build/bin/postgres -D testdb2 | |--= 87092 phil postgres: checkpointer | |--= 87093 phil postgres: background writer | |--= 87095 phil postgres: walwriter | |--= 87096 phil postgres: autovacuum launcher | |--= 87097 phil postgres: logical replication launcher | \--= 89689 phil postgres: logical replication apply worker for subscription 16407 \-+= 87135 phil /Users/phil/edb/postgres17/build/bin/postgres -D testdb1 |--= 87136 phil postgres: checkpointer |--= 87137 phil postgres: background writer |--= 87139 phil postgres: walwriter |--= 87140 phil postgres: autovacuum launcher |--= 87141 phil postgres: logical replication launcher \--= 89696 phil postgres: walsender phil postgres [local] START_REPLICATIONOn the publishing side Postgres creates a new process called walsender. The process being called walsender sounds like a remnant of physical replication being implemented first historically, where Postgres literally ships one (usually 16MB) WAL segment at a time from database to database. Indeed the walsender.c code handles both physical and logical replication.The walsender process starts up when a publication is configured. For example with the CREATE PUBLICATION SQL command. Each publication gets its own walsender process.On the subscribing side we get a new process called apply worker. The apply worker source is in worker.c. This code is specific to logical replication. This process connects to the publishing Postgres database and ends up speaking with the walsender process.The apply worker process starts up when a subscription is configured. For example with the CREATE SUBSCRIPTION SQL command. Each subscription gets its own apply worker process.Communication between the apply worker and the walsender happens through what is called the logical replication protocol.To make this a little more tangible, let's edit the Postgres source code. Let's add logs in the walsender process when we send an INSERT change and add logs in the apply worker process when we receive an INSERT change.Hooking on sendwalsender.c itself doesn't handle any WAL records. It is what's called the output plugin (that runs within the walsender process) that does. The default format for the WAL records over the wire between the walsender and the apply worker is pgoutput and is implemented in pgoutput.c. This format is set as the default in libpqwalreceiver.c.For typical users, you don't need to think about the format. CREATE SUBSCRIPTION doesn't even let you pick the format. You can only override the default format by calling pg_create_logical_replication_slot, passing that function a different output plugin, and telling CREATE SUBSCRIPTION not to create a new slot and to instead use the one you created manually with pg_create_logical_replication_slot. To reiterate: if you don't need to override the output plugin, you probably don't need to call pg_create_logical_replication_slot directly.Each output plugin sets up callbacks for the WAL decoder in their own _PG_output_plugin_init implementation. One of these callbacks is change_cb that handles INSERTs, UPDATEs, and DELETEs. So we will add a log line (with elog) to pgoutput_change to learn when we have decoded an INSERT, UPDATE, or DELETE and are going to send that logical change to the subscriber.$ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 99518c6b6d..7719d5a622 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1556,14 +1556,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (action) { case REORDER_BUFFER_CHANGE_INSERT: + elog(LOG, "SENDING INSERT!"); logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, data->binary, relentry->columns); break; case REORDER_BUFFER_CHANGE_UPDATE: + elog(LOG, "SENDING UPDATE!"); logicalrep_write_update(ctx->out, xid, targetrel, old_slot, new_slot, data->binary, relentry->columns); break; case REORDER_BUFFER_CHANGE_DELETE: + elog(LOG, "SENDING DELETE!"); logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, data->binary, relentry->columns); break;Hooking on receiveThe subscription side in the apply worker process is simpler. There is a loop that interprets each message the publisher sends to the subscribing worker process. An INSERT is one type of message. The worker calls apply_handle_insert to handle an INSERT message. So we can add a log in that method, as well as logs in equivalent apply_handle_delete and apply_handle_update methods to learn about each time we receive an INSERT, UPDATE, or DELETE on the subscriber side.$ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d091a1dd27..44d921ba5a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2427,6 +2427,7 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); + elog(LOG, "GOT AN INSERT!"); /* For a partitioned table, insert the tuple into a partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) apply_handle_tuple_routing(edata, @@ -2607,6 +2608,7 @@ apply_handle_update(StringInfo s) has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); + elog(LOG, "GOT AN UPDATE!"); /* For a partitioned table, apply update to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) apply_handle_tuple_routing(edata, @@ -2762,6 +2764,7 @@ apply_handle_delete(StringInfo s) slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); + elog(LOG, "GOT A DELETE!"); /* For a partitioned table, apply delete to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) apply_handle_tuple_routing(edata,Then rebuild Postgres with make && make install so we can observe these logs.Trying it outLet's shut down, delete, recreate, and start both databases.# Shut down both databases $ ./build/bin/pg_ctl -D testdb1 -l logfile1 stop waiting for server to shut down.... done server stopped $ ./build/bin/pg_ctl -D testdb2 -l logfile2 stop waiting for server to shut down.... done server stopped # Delete both databases $ rm -rf testdb1 testdb2 logfile1 logfile2 # Recreate both databases $ ./build/bin/initdb testdb1 $ ./build/bin/initdb testdb2 $ printf 'port = 6002\nwal_level = logical' >> testdb2/postgresql.conf $ printf 'port = 6001\nwal_level = logical' >> testdb1/postgresql.conf # Start both databases $./build/bin/pg_ctl -D testdb2 -l logfile2 start waiting for server to start.... done server started $ ./build/bin/pg_ctl -D testdb1 -l logfile1 start waiting for server to start.... done server startedNow tail both log files so we can see the new logs we added.$ tail -f logfile1 logfile2 ==> logfile1 logfile2 logfile1 logfile2
2025-01-16
Struggling with Keeping Database Environments in Sync? Here’s My Proven Fix
Are you stuck managing chaotic database environments? Discover how I implemented a proven, hassle-free approach to effortlessly sync dev and production environments, saving time and reducing errors.
Build a Database in 3000 Lines with 0 Dependencies
Build a Database in 3000 Lines with 0 Dependencies
2025-01-15
Unraveling a Postgres segfault that uncovered an Arm64 JIT compiler bug
Learn how an unassuming Postgres error led us to discover a bug in Postgres for Arm.