always trigger db export when window interval passes#39
Conversation
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
| return err | ||
| } | ||
|
|
||
| bp := vaultsprovider.New(cfg.Vaults[vault].ProviderHost) |
There was a problem hiding this comment.
the changes here are just to put the variable declaration closer to where it is used
| github.com/jackc/pgx/v5 v5.4.3 | ||
| github.com/lib/pq v1.2.0 | ||
| github.com/marcboeker/go-duckdb v1.4.4 | ||
| github.com/marcboeker/go-duckdb v1.6.1 |
There was a problem hiding this comment.
upgrades duckdb, i think this solves ENG-793
| // Close closes the current db. | ||
| func (dbm *DBManager) Close() error { | ||
| return dbm.db.Close() | ||
| func (dbm *DBManager) Close() { |
There was a problem hiding this comment.
stops the gouroutine
| ticker := time.NewTicker(dbm.windowInterval) | ||
| dbm.close = make(chan struct{}) | ||
|
|
||
| go func() { |
There was a problem hiding this comment.
spwans a goroutine that every windowInterval calls dbm.replace which will replace the current database where data is being aggregated, export it to parquet and upload, and create a new database for next window
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| dbm.mu.Lock() |
There was a problem hiding this comment.
we're locking the database while doing this operation, so that Replay is blocked to affect the database
| // a new one. The current db is exported and uploaded before | ||
| // new db is ready to be used. | ||
| func (dbm *DBManager) Replay(ctx context.Context, tx *pgrepl.Tx) error { | ||
| dbm.mu.Lock() |
There was a problem hiding this comment.
While Replay is being executed, we can't trigger replace
| // Upload the exported parquet file | ||
| if err := dbm.UploadAt(ctx, exportAt); err != nil { | ||
| fmt.Println("upload error, skipping", "err", err) | ||
| if !isEmpty { |
There was a problem hiding this comment.
upload only if database is not empty
| } else { | ||
| slog.Info("backing up current db") | ||
| } | ||
| var n int |
There was a problem hiding this comment.
this query helps us figure out if the database is empty or not, so we don't upload an empty parquet file
|
|
||
| func importLocalDB(t *testing.T, file *os.File) *sql.Rows { | ||
| db, err := sql.Open("duckdb", path.Join(t.TempDir(), "temp.db")) | ||
| db, err := sql.Open("duckdb", path.Join(t.TempDir(), "tmp.db")) |
There was a problem hiding this comment.
for some reason i had to do this for tests to pass
| @@ -5,14 +5,15 @@ import ( | |||
| "database/sql" | |||
There was a problem hiding this comment.
most of the changes in this file are just moving functions around, that's why the diff is big. but the main changes are:
- introduces a goroutine that triggers
replacewhen window interval ticks - adds a check to see if database is empty
|
@brunocalza this worked great / exactly how i'd expect it to work! for reference, this is a log for a stream with a 5 second window: |
|
oh, and i can confirm the db streaming parquet file format now works with the |
This PR changes the behavior of when the database changes are exported and uploaded. See: ENG-784
note: by upgrading duckdb, it seems that ENG-793 got fixed
@dtbuchholz do you mind testing these changes and if you can find any issues?