Skip to content
This repository was archived by the owner on Jan 28, 2026. It is now read-only.

always trigger db export when window interval passes#39

Merged
brunocalza merged 1 commit intomainfrom
bcalza/improvstream
Mar 6, 2024
Merged

always trigger db export when window interval passes#39
brunocalza merged 1 commit intomainfrom
bcalza/improvstream

Conversation

@brunocalza
Copy link
Contributor

@brunocalza brunocalza commented Feb 29, 2024

This PR changes the behavior of when the database changes are exported and uploaded. See: ENG-784

note: by upgrading duckdb, it seems that ENG-793 got fixed

@dtbuchholz do you mind testing these changes and if you can find any issues?

Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
@brunocalza brunocalza self-assigned this Feb 29, 2024
return err
}

bp := vaultsprovider.New(cfg.Vaults[vault].ProviderHost)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the changes here are just to put the variable declaration closer to where it is used

github.com/jackc/pgx/v5 v5.4.3
github.com/lib/pq v1.2.0
github.com/marcboeker/go-duckdb v1.4.4
github.com/marcboeker/go-duckdb v1.6.1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upgrades duckdb, i think this solves ENG-793

// Close closes the current db.
func (dbm *DBManager) Close() error {
return dbm.db.Close()
func (dbm *DBManager) Close() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stops the gouroutine

ticker := time.NewTicker(dbm.windowInterval)
dbm.close = make(chan struct{})

go func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spwans a goroutine that every windowInterval calls dbm.replace which will replace the current database where data is being aggregated, export it to parquet and upload, and create a new database for next window

for {
select {
case <-ticker.C:
dbm.mu.Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're locking the database while doing this operation, so that Replay is blocked to affect the database

// a new one. The current db is exported and uploaded before
// new db is ready to be used.
func (dbm *DBManager) Replay(ctx context.Context, tx *pgrepl.Tx) error {
dbm.mu.Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While Replay is being executed, we can't trigger replace

// Upload the exported parquet file
if err := dbm.UploadAt(ctx, exportAt); err != nil {
fmt.Println("upload error, skipping", "err", err)
if !isEmpty {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upload only if database is not empty

} else {
slog.Info("backing up current db")
}
var n int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this query helps us figure out if the database is empty or not, so we don't upload an empty parquet file


func importLocalDB(t *testing.T, file *os.File) *sql.Rows {
db, err := sql.Open("duckdb", path.Join(t.TempDir(), "temp.db"))
db, err := sql.Open("duckdb", path.Join(t.TempDir(), "tmp.db"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some reason i had to do this for tests to pass

@@ -5,14 +5,15 @@ import (
"database/sql"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of the changes in this file are just moving functions around, that's why the diff is big. but the main changes are:

  • introduces a goroutine that triggers replace when window interval ticks
  • adds a check to see if database is empty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!

@brunocalza brunocalza requested a review from avichalp February 29, 2024 20:02
@brunocalza brunocalza marked this pull request as ready for review February 29, 2024 20:05
Copy link
Contributor

@avichalp avichalp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good!

@dtbuchholz
Copy link
Contributor

@brunocalza this worked great / exactly how i'd expect it to work!

for reference, this is a log for a stream with a 5 second window:

> vaults stream \
--private-key "$(cat .wallet)" \
test_pg.stream_5
2024/03/01 10:24:37 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317477559109000.db
2024/03/01 10:24:37 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"
2024/03/01 10:24:37 INFO Logical replication started slot=basin_stream_5
2024/03/01 10:24:39 INFO new transaction received
2024/03/01 10:24:39 INFO replaying query="insert into stream_5 (id, val, fl) values (1, 'test', 1.1)"
2024/03/01 10:24:39 INFO transaction acked
2024/03/01 10:24:42 INFO window interval passed
2024/03/01 10:24:42 INFO backing up current db
2024/03/01 10:24:42 INFO closing current db
2024/03/01 10:24:45 INFO deleting db parquet export at=/Users/dtb/.vaults/test_pg.stream_5/1709317477559109000.db.parquet
2024/03/01 10:24:45 INFO deleting db dump at=/Users/dtb/.vaults/test_pg.stream_5/1709317477559109000.db
2024/03/01 10:24:45 INFO deleting db wal at=/Users/dtb/.vaults/test_pg.stream_5/1709317477559109000.db.wal
2024/03/01 10:24:45 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317485923334000.db
2024/03/01 10:24:45 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"
2024/03/01 10:24:47 INFO window interval passed
2024/03/01 10:24:47 INFO backing up current db
2024/03/01 10:24:47 INFO closing current db
2024/03/01 10:24:47 INFO deleting db dump at=/Users/dtb/.vaults/test_pg.stream_5/1709317485923334000.db
2024/03/01 10:24:47 INFO deleting db wal at=/Users/dtb/.vaults/test_pg.stream_5/1709317485923334000.db.wal
2024/03/01 10:24:47 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317487620885000.db
2024/03/01 10:24:47 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"
2024/03/01 10:24:52 INFO window interval passed
2024/03/01 10:24:52 INFO backing up current db
2024/03/01 10:24:52 INFO closing current db
2024/03/01 10:24:52 INFO deleting db dump at=/Users/dtb/.vaults/test_pg.stream_5/1709317487620885000.db
2024/03/01 10:24:52 INFO deleting db wal at=/Users/dtb/.vaults/test_pg.stream_5/1709317487620885000.db.wal
2024/03/01 10:24:52 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317492619785000.db
2024/03/01 10:24:52 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"
2024/03/01 10:24:57 INFO window interval passed
2024/03/01 10:24:57 INFO backing up current db
2024/03/01 10:24:57 INFO closing current db
2024/03/01 10:24:57 INFO deleting db dump at=/Users/dtb/.vaults/test_pg.stream_5/1709317492619785000.db
2024/03/01 10:24:57 INFO deleting db wal at=/Users/dtb/.vaults/test_pg.stream_5/1709317492619785000.db.wal
2024/03/01 10:24:57 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317497620338000.db
2024/03/01 10:24:57 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"
2024/03/01 10:24:59 INFO new transaction received
2024/03/01 10:24:59 INFO replaying query="insert into stream_5 (id, val, fl) values (2, 'test', 1.2)"
2024/03/01 10:24:59 INFO transaction acked
2024/03/01 10:25:02 INFO window interval passed
2024/03/01 10:25:02 INFO backing up current db
2024/03/01 10:25:02 INFO closing current db
2024/03/01 10:25:05 INFO deleting db parquet export at=/Users/dtb/.vaults/test_pg.stream_5/1709317497620338000.db.parquet
2024/03/01 10:25:05 INFO deleting db dump at=/Users/dtb/.vaults/test_pg.stream_5/1709317497620338000.db
2024/03/01 10:25:05 INFO deleting db wal at=/Users/dtb/.vaults/test_pg.stream_5/1709317497620338000.db.wal
2024/03/01 10:25:05 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317505768314000.db
2024/03/01 10:25:05 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"
2024/03/01 10:25:07 INFO window interval passed
2024/03/01 10:25:07 INFO backing up current db
2024/03/01 10:25:07 INFO closing current db
2024/03/01 10:25:07 INFO deleting db dump at=/Users/dtb/.vaults/test_pg.stream_5/1709317505768314000.db
2024/03/01 10:25:07 INFO deleting db wal at=/Users/dtb/.vaults/test_pg.stream_5/1709317505768314000.db.wal
2024/03/01 10:25:07 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317507621172000.db
2024/03/01 10:25:07 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"
2024/03/01 10:25:12 INFO window interval passed
2024/03/01 10:25:12 INFO backing up current db
2024/03/01 10:25:12 INFO closing current db
2024/03/01 10:25:12 INFO deleting db dump at=/Users/dtb/.vaults/test_pg.stream_5/1709317507621172000.db
2024/03/01 10:25:12 INFO deleting db wal at=/Users/dtb/.vaults/test_pg.stream_5/1709317507621172000.db.wal
2024/03/01 10:25:12 INFO created new db at=/Users/dtb/.vaults/test_pg.stream_5/1709317512621212000.db
2024/03/01 10:25:12 INFO applying create query="CREATE TABLE IF NOT EXISTS stream_5 (id integer,val varchar,fl double)"

@dtbuchholz
Copy link
Contributor

oh, and i can confirm the db streaming parquet file format now works with the parquet CLI tool i was previously using. thus, basin file uploads and db streams have the same parquet format 👍

@brunocalza brunocalza merged commit 30b6b5e into main Mar 6, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants