Skip to content

Commit

Permalink
fix(postgresql): Table initialization is now aware of schema name
Browse files Browse the repository at this point in the history
  • Loading branch information
NickTsitlakidis committed Nov 30, 2024
1 parent 446631d commit a6da091
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 39 deletions.
46 changes: 23 additions & 23 deletions libs/postgresql/src/lib/table-initializer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ describe("TableInitializer", () => {
await tableInitializer.onApplicationBootstrap();

const [hasAggregatesTable, hasEventsTable] = await Promise.all([
knexConnection.schema.hasTable("es_aggregates"),
knexConnection.schema.hasTable("es_events")
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_aggregates"),
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_events")
]);

expect(hasAggregatesTable).toBe(false);
Expand All @@ -56,27 +56,27 @@ describe("TableInitializer", () => {
await tableInitializer.onApplicationBootstrap();

const [hasAggregatesTable, hasEventsTable] = await Promise.all([
knexConnection.schema.hasTable("es_aggregates"),
knexConnection.schema.hasTable("es_events")
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_aggregates"),
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_events")
]);

expect(hasAggregatesTable).toBe(true);
expect(hasEventsTable).toBe(true);

const columnChecks = await Promise.all([
knexConnection.schema.hasColumn("es_events", "id"),
knexConnection.schema.hasColumn("es_events", "aggregate_root_id"),
knexConnection.schema.hasColumn("es_events", "aggregate_root_version"),
knexConnection.schema.hasColumn("es_events", "aggregate_root_name"),
knexConnection.schema.hasColumn("es_events", "event_name"),
knexConnection.schema.hasColumn("es_events", "payload"),
knexConnection.schema.hasColumn("es_events", "created_at")
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_events", "id"),
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_events", "aggregate_root_id"),
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_events", "aggregate_root_version"),
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_events", "aggregate_root_name"),
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_events", "event_name"),
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_events", "payload"),
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_events", "created_at")
]);

expect(columnChecks.every(Boolean)).toBe(true);

await knexConnection.schema.dropTable("es_events");
await knexConnection.schema.dropTable("es_aggregates");
await knexConnection.schema.withSchema("event_nest_tests").dropTable("es_events");
await knexConnection.schema.withSchema("event_nest_tests").dropTable("es_aggregates");
});

test("creates aggregates table when it's missing", async () => {
Expand All @@ -89,22 +89,22 @@ describe("TableInitializer", () => {
await tableInitializer.onApplicationBootstrap();

const [hasAggregatesTable, hasEventsTable] = await Promise.all([
knexConnection.schema.hasTable("es_aggregates"),
knexConnection.schema.hasTable("es_events")
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_aggregates"),
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_events")
]);

expect(hasAggregatesTable).toBe(true);
expect(hasEventsTable).toBe(true);

const columnChecks = await Promise.all([
knexConnection.schema.hasColumn("es_aggregates", "id"),
knexConnection.schema.hasColumn("es_aggregates", "version")
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_aggregates", "id"),
knexConnection.schema.withSchema("event_nest_tests").hasColumn("es_aggregates", "version")
]);

expect(columnChecks.every(Boolean)).toBe(true);

await knexConnection.schema.dropTable("es_events");
await knexConnection.schema.dropTable("es_aggregates");
await knexConnection.schema.withSchema("event_nest_tests").dropTable("es_events");
await knexConnection.schema.withSchema("event_nest_tests").dropTable("es_aggregates");
});

test("does not recreate tables when they already exist", async () => {
Expand All @@ -118,15 +118,15 @@ describe("TableInitializer", () => {
await tableInitializer.onApplicationBootstrap();

const [hasAggregatesTable, hasEventsTable] = await Promise.all([
knexConnection.schema.hasTable("es_aggregates"),
knexConnection.schema.hasTable("es_events")
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_aggregates"),
knexConnection.schema.withSchema("event_nest_tests").hasTable("es_events")
]);

expect(hasAggregatesTable).toBe(true);
expect(hasEventsTable).toBe(true);

await knexConnection.schema.dropTable("es_events");
await knexConnection.schema.dropTable("es_aggregates");
await knexConnection.schema.withSchema("event_nest_tests").dropTable("es_events");
await knexConnection.schema.withSchema("event_nest_tests").dropTable("es_aggregates");
});
});
});
43 changes: 27 additions & 16 deletions libs/postgresql/src/lib/table-initializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,44 @@ export class TableInitializer implements OnApplicationBootstrap {

try {
const [hasAggregatesTable, hasEventsTable] = await Promise.all([
this._knexConnection.schema.hasTable(this._schemaConfiguration.aggregatesTable),
this._knexConnection.schema.hasTable(this._schemaConfiguration.eventsTable)
this._knexConnection.schema
.withSchema(this._schemaConfiguration.schema)
.hasTable(this._schemaConfiguration.aggregatesTable),
this._knexConnection.schema
.withSchema(this._schemaConfiguration.schema)
.hasTable(this._schemaConfiguration.eventsTable)
]);

if (hasAggregatesTable) {
this._logger.log("Skipping aggregates table initialization. Table already exists");
} else {
await this._knexConnection.schema.createTable(this._schemaConfiguration.aggregatesTable, (table) => {
table.uuid("id").primary();
table.integer("version").notNullable();
});
await this._knexConnection.schema
.withSchema(this._schemaConfiguration.schema)
.createTable(this._schemaConfiguration.aggregatesTable, (table) => {
table.uuid("id").primary();
table.integer("version").notNullable();
});
this._logger.log("Aggregates table created successfully");
}

if (hasEventsTable) {
this._logger.log("Skipping events table initialization. Table already exists");
} else {
await this._knexConnection.schema.createTable(this._schemaConfiguration.eventsTable, (table) => {
table.uuid("id").primary();
table.uuid("aggregate_root_id").notNullable();
table.integer("aggregate_root_version").notNullable();
table.text("aggregate_root_name").notNullable();
table.text("event_name").notNullable();
table.jsonb("payload").notNullable();
table.timestamp("created_at", { useTz: true }).notNullable();
table.foreign("aggregate_root_id").references(`${this._schemaConfiguration.aggregatesTable}.id`);
});
await this._knexConnection.schema
.withSchema(this._schemaConfiguration.schema)
.createTable(this._schemaConfiguration.eventsTable, (table) => {
table.uuid("id").primary();
table.uuid("aggregate_root_id").notNullable();
table.integer("aggregate_root_version").notNullable();
table.text("aggregate_root_name").notNullable();
table.text("event_name").notNullable();
table.jsonb("payload").notNullable();
table.timestamp("created_at", { useTz: true }).notNullable();
table
.foreign("aggregate_root_id")
.references(`id`)
.inTable(this._schemaConfiguration.schemaAwareAggregatesTable);
});
this._logger.log("Events table created successfully");
}
} catch (error) {
Expand Down

0 comments on commit a6da091

Please sign in to comment.