Compare commits

...

16 Commits

Author SHA1 Message Date
mertalev
3af4b8d7a7 sync add 2026-02-14 07:08:25 -05:00
mertalev
1989a8bec2 fix bottleneck 2026-02-14 06:28:09 -05:00
mertalev
0e1f61176a better serial queue handling 2026-02-14 04:42:29 -05:00
mertalev
0160e6fd5f fix bottleneck 2026-02-14 04:14:52 -05:00
mertalev
839fb61340 fix copy race 2026-02-14 03:32:59 -05:00
mertalev
6b04fa3f94 migration 2026-02-14 03:11:55 -05:00
mertalev
947f00ac9d better debounce 2026-02-14 02:47:39 -05:00
mertalev
8bfacda3da retry 2026-02-14 02:22:42 -05:00
mertalev
b9fceeef75 fix index 2026-02-14 01:25:36 -05:00
mertalev
b05848f5ab tetris 2026-02-14 01:13:13 -05:00
mertalev
27de6e7c1a fixes 2026-02-14 00:57:08 -05:00
mertalev
880b2ab665 generate migration 2026-02-13 23:39:22 -05:00
mertalev
5a7b298d02 use COPY 2026-02-13 21:45:35 -05:00
mertalev
4ee7a39e7a refactor 2026-02-13 21:00:04 -05:00
mertalev
c5c8fc56a5 pg impl 2026-02-12 20:42:38 -05:00
mertalev
295ab7a11a sql-tools changes 2026-02-12 20:30:18 -05:00
26 changed files with 1641 additions and 416 deletions

296
pnpm-lock.yaml generated
View File

@@ -67,7 +67,7 @@ importers:
version: 24.10.13
'@vitest/coverage-v8':
specifier: ^3.0.0
version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
byte-size:
specifier: ^9.0.0
version: 9.0.1
@@ -115,10 +115,10 @@ importers:
version: 6.1.0(typescript@5.9.3)(vite@7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
vitest:
specifier: ^3.0.0
version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest-fetch-mock:
specifier: ^0.4.0
version: 0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
version: 0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
yaml:
specifier: ^2.3.1
version: 2.8.2
@@ -343,9 +343,6 @@ importers:
'@extism/extism':
specifier: 2.0.0-rc13
version: 2.0.0-rc13
'@nestjs/bullmq':
specifier: ^11.0.1
version: 11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13)(bullmq@5.67.3)
'@nestjs/common':
specifier: ^11.0.4
version: 11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2)
@@ -424,9 +421,6 @@ importers:
body-parser:
specifier: ^2.2.0
version: 2.2.2
bullmq:
specifier: ^5.51.0
version: 5.67.3
chokidar:
specifier: ^4.0.3
version: 4.0.3
@@ -670,7 +664,7 @@ importers:
version: 13.15.10
'@vitest/coverage-v8':
specifier: ^3.0.0
version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
eslint:
specifier: ^9.14.0
version: 9.39.2(jiti@2.6.1)
@@ -727,7 +721,7 @@ importers:
version: 6.1.0(typescript@5.9.3)(vite@7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
vitest:
specifier: ^3.0.0
version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
web:
dependencies:
@@ -781,7 +775,7 @@ importers:
version: 2.6.0
fabric:
specifier: ^6.5.4
version: 6.9.1
version: 6.9.1(encoding@0.1.13)
geo-coordinates-parser:
specifier: ^1.7.4
version: 1.7.4
@@ -881,7 +875,7 @@ importers:
version: 6.9.1
'@testing-library/svelte':
specifier: ^5.2.8
version: 5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
version: 5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
'@testing-library/user-event':
specifier: ^14.5.2
version: 14.6.1(@testing-library/dom@10.4.1)
@@ -905,7 +899,7 @@ importers:
version: 1.5.6
'@vitest/coverage-v8':
specifier: ^3.0.0
version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
dotenv:
specifier: ^17.0.0
version: 17.2.4
@@ -968,7 +962,7 @@ importers:
version: 7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest:
specifier: ^3.0.0
version: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
version: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
packages:
@@ -3402,52 +3396,9 @@ packages:
'@microsoft/tsdoc@0.16.0':
resolution: {integrity: sha512-xgAyonlVVS+q7Vc7qLW0UrJU7rSFcETRWsqdXZtjzRU8dF+6CkozTK4V4y1LwOX7j8r/vHphjDeMeGI4tNGeGA==}
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==}
cpu: [arm64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==}
cpu: [x64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==}
cpu: [arm64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==}
cpu: [arm]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==}
cpu: [x64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==}
cpu: [x64]
os: [win32]
'@namnode/store@0.1.0':
resolution: {integrity: sha512-4NGTldxKcmY0UuZ7OEkvCjs8ZEoeYB6M2UwMu74pdLiFMKxXbj9HdNk1Qn213bxX1O7bY5h+PLh5DZsTURZkYA==}
'@nestjs/bull-shared@11.0.4':
resolution: {integrity: sha512-VBJcDHSAzxQnpcDfA0kt9MTGUD1XZzfByV70su0W0eDCQ9aqIEBlzWRW21tv9FG9dIut22ysgDidshdjlnczLw==}
peerDependencies:
'@nestjs/common': ^10.0.0 || ^11.0.0
'@nestjs/core': ^10.0.0 || ^11.0.0
'@nestjs/bullmq@11.0.4':
resolution: {integrity: sha512-wBzK9raAVG0/6NTMdvLGM4/FQ1lsB35/pYS8L6a0SDgkTiLpd7mAjQ8R692oMx5s7IjvgntaZOuTUrKYLNfIkA==}
peerDependencies:
'@nestjs/common': ^10.0.0 || ^11.0.0
'@nestjs/core': ^10.0.0 || ^11.0.0
bullmq: ^3.0.0 || ^4.0.0 || ^5.0.0
'@nestjs/cli@11.0.16':
resolution: {integrity: sha512-P0H+Vcjki6P5160E5QnMt3Q0X5FTg4PZkP99Ig4lm/4JWqfw32j3EXv3YBTJ2DmxLwOQ/IS9F7dzKpMAgzKTGg==}
engines: {node: '>= 20.11'}
@@ -5744,9 +5695,6 @@ packages:
resolution: {integrity: sha512-bkXY9WsVpY7CvMhKSR6pZilZu9Ln5WDrKVBUXf2S443etkmEO4V58heTecXcUIsNsi4Rx8JUO4NfX1IcQl4deg==}
engines: {node: '>=18.20'}
bullmq@5.67.3:
resolution: {integrity: sha512-eeQobOJn8M0Rj8tcZCVFLrimZgJQallJH1JpclOoyut2nDNkDwTEPMVcZzLeSR2fGeIVbfJTjU96F563Qkge5A==}
bundle-name@4.1.0:
resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==}
engines: {node: '>=18'}
@@ -6241,10 +6189,6 @@ packages:
crelt@1.0.6:
resolution: {integrity: sha512-VQ2MBenTq1fWZUH9DJNGti7kKv6EeAuYr3cLwxUWhIu1baTaXh4Ib5W2CqHVqib4/MqbYGJqiL3Zb8GJZr3l4g==}
cron-parser@4.9.0:
resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==}
engines: {node: '>=12.0.0'}
cron@4.4.0:
resolution: {integrity: sha512-fkdfq+b+AHI4cKdhZlppHveI/mgz2qpiYxcm+t5E5TsxX7QrLS1VE0+7GENEk9z0EeGPcpSciGv6ez24duWhwQ==}
engines: {node: '>=18.x'}
@@ -9104,13 +9048,6 @@ packages:
ms@2.1.3:
resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==}
msgpackr-extract@3.0.3:
resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==}
hasBin: true
msgpackr@1.11.5:
resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==}
multer@2.0.2:
resolution: {integrity: sha512-u7f2xaZ/UG8oLXHvtF/oWTRvT44p9ecwBBqTwgJVq0+4BW1g8OW01TyMEGWBHbyMOYVHXslaut7qEQ1meATXgw==}
engines: {node: '>= 10.16.0'}
@@ -9240,10 +9177,6 @@ packages:
resolution: {integrity: sha512-rLvcdSyRCyouf6jcOIPe/BgwG/d7hKjzMKOas33/pHEr6gbq18IK9zV7DiPvzsz0oBJPme6qr6H6kGZuI9/DZg==}
engines: {node: '>= 6.13.0'}
node-gyp-build-optional-packages@5.2.2:
resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==}
hasBin: true
node-gyp-build@4.8.4:
resolution: {integrity: sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==}
hasBin: true
@@ -10753,11 +10686,6 @@ packages:
resolution: {integrity: sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==}
hasBin: true
semver@7.7.3:
resolution: {integrity: sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==}
engines: {node: '>=10'}
hasBin: true
semver@7.7.4:
resolution: {integrity: sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==}
engines: {node: '>=10'}
@@ -15291,22 +15219,6 @@ snapshots:
dependencies:
mapbox-gl: 1.13.3
'@mapbox/node-pre-gyp@1.0.11':
dependencies:
detect-libc: 2.1.2
https-proxy-agent: 5.0.1
make-dir: 3.1.0
node-fetch: 2.7.0
nopt: 5.0.0
npmlog: 5.0.1
rimraf: 3.0.2
semver: 7.7.4
tar: 6.2.1
transitivePeerDependencies:
- encoding
- supports-color
optional: true
'@mapbox/node-pre-gyp@1.0.11(encoding@0.1.13)':
dependencies:
detect-libc: 2.1.2
@@ -15426,40 +15338,8 @@ snapshots:
'@microsoft/tsdoc@0.16.0': {}
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
optional: true
'@namnode/store@0.1.0': {}
'@nestjs/bull-shared@11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13)':
dependencies:
'@nestjs/common': 11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2)
'@nestjs/core': 11.1.13(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/platform-express@11.1.13)(@nestjs/websockets@11.1.13)(reflect-metadata@0.2.2)(rxjs@7.8.2)
tslib: 2.8.1
'@nestjs/bullmq@11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13)(bullmq@5.67.3)':
dependencies:
'@nestjs/bull-shared': 11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13)
'@nestjs/common': 11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2)
'@nestjs/core': 11.1.13(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/platform-express@11.1.13)(@nestjs/websockets@11.1.13)(reflect-metadata@0.2.2)(rxjs@7.8.2)
bullmq: 5.67.3
tslib: 2.8.1
'@nestjs/cli@11.0.16(@swc/core@1.15.11(@swc/helpers@0.5.17))(@types/node@24.10.13)':
dependencies:
'@angular-devkit/core': 19.2.19(chokidar@4.0.3)
@@ -16627,14 +16507,14 @@ snapshots:
dependencies:
svelte: 5.50.0
'@testing-library/svelte@5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))':
'@testing-library/svelte@5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))':
dependencies:
'@testing-library/dom': 10.4.1
'@testing-library/svelte-core': 1.0.0(svelte@5.50.0)
svelte: 5.50.0
optionalDependencies:
vite: 7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
'@testing-library/user-event@14.6.1(@testing-library/dom@10.4.1)':
dependencies:
@@ -17330,7 +17210,7 @@ snapshots:
'@vercel/oidc@3.0.5': {}
'@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))':
'@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))':
dependencies:
'@ampproject/remapping': 2.3.0
'@bcoe/v8-coverage': 1.0.2
@@ -17345,11 +17225,11 @@ snapshots:
std-env: 3.10.0
test-exclude: 7.0.1
tinyrainbow: 2.0.0
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
transitivePeerDependencies:
- supports-color
'@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))':
'@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))':
dependencies:
'@ampproject/remapping': 2.3.0
'@bcoe/v8-coverage': 1.0.2
@@ -17364,7 +17244,7 @@ snapshots:
std-env: 3.10.0
test-exclude: 7.0.1
tinyrainbow: 2.0.0
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
transitivePeerDependencies:
- supports-color
@@ -17987,18 +17867,6 @@ snapshots:
builtin-modules@5.0.0: {}
bullmq@5.67.3:
dependencies:
cron-parser: 4.9.0
ioredis: 5.9.2
msgpackr: 1.11.5
node-abort-controller: 3.1.1
semver: 7.7.3
tslib: 2.8.1
uuid: 11.1.0
transitivePeerDependencies:
- supports-color
bundle-name@4.1.0:
dependencies:
run-applescript: 7.1.0
@@ -18084,16 +17952,6 @@ snapshots:
caniuse-lite@1.0.30001769: {}
canvas@2.11.2:
dependencies:
'@mapbox/node-pre-gyp': 1.0.11
nan: 2.24.0
simple-get: 3.1.1
transitivePeerDependencies:
- encoding
- supports-color
optional: true
canvas@2.11.2(encoding@0.1.13):
dependencies:
'@mapbox/node-pre-gyp': 1.0.11(encoding@0.1.13)
@@ -18487,10 +18345,6 @@ snapshots:
crelt@1.0.6: {}
cron-parser@4.9.0:
dependencies:
luxon: 3.7.2
cron@4.4.0:
dependencies:
'@types/luxon': 3.7.1
@@ -19714,10 +19568,10 @@ snapshots:
extend@3.0.2: {}
fabric@6.9.1:
fabric@6.9.1(encoding@0.1.13):
optionalDependencies:
canvas: 2.11.2
jsdom: 20.0.3(canvas@2.11.2)
canvas: 2.11.2(encoding@0.1.13)
jsdom: 20.0.3(canvas@2.11.2(encoding@0.1.13))
transitivePeerDependencies:
- bufferutil
- encoding
@@ -20870,7 +20724,7 @@ snapshots:
dependencies:
argparse: 2.0.1
jsdom@20.0.3(canvas@2.11.2):
jsdom@20.0.3(canvas@2.11.2(encoding@0.1.13)):
dependencies:
abab: 2.0.6
acorn: 8.15.0
@@ -20899,7 +20753,7 @@ snapshots:
ws: 8.19.0
xml-name-validator: 4.0.0
optionalDependencies:
canvas: 2.11.2
canvas: 2.11.2(encoding@0.1.13)
transitivePeerDependencies:
- bufferutil
- supports-color
@@ -20936,36 +20790,6 @@ snapshots:
- utf-8-validate
optional: true
jsdom@26.1.0(canvas@2.11.2):
dependencies:
cssstyle: 4.6.0
data-urls: 5.0.0
decimal.js: 10.6.0
html-encoding-sniffer: 4.0.0
http-proxy-agent: 7.0.2
https-proxy-agent: 7.0.6
is-potential-custom-element-name: 1.0.1
nwsapi: 2.2.23
parse5: 7.3.0
rrweb-cssom: 0.8.0
saxes: 6.0.0
symbol-tree: 3.2.4
tough-cookie: 5.1.2
w3c-xmlserializer: 5.0.0
webidl-conversions: 7.0.0
whatwg-encoding: 3.1.1
whatwg-mimetype: 4.0.0
whatwg-url: 14.2.0
ws: 8.19.0
xml-name-validator: 5.0.0
optionalDependencies:
canvas: 2.11.2
transitivePeerDependencies:
- bufferutil
- supports-color
- utf-8-validate
optional: true
jsep@1.4.0: {}
jsesc@3.1.0: {}
@@ -22089,22 +21913,6 @@ snapshots:
ms@2.1.3: {}
msgpackr-extract@3.0.3:
dependencies:
node-gyp-build-optional-packages: 5.2.2
optionalDependencies:
'@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3
optional: true
msgpackr@1.11.5:
optionalDependencies:
msgpackr-extract: 3.0.3
multer@2.0.2:
dependencies:
append-field: 1.0.0
@@ -22223,11 +22031,6 @@ snapshots:
emojilib: 2.4.0
skin-tone: 2.0.0
node-fetch@2.7.0:
dependencies:
whatwg-url: 5.0.0
optional: true
node-fetch@2.7.0(encoding@0.1.13):
dependencies:
whatwg-url: 5.0.0
@@ -22236,11 +22039,6 @@ snapshots:
node-forge@1.3.3: {}
node-gyp-build-optional-packages@5.2.2:
dependencies:
detect-libc: 2.1.2
optional: true
node-gyp-build@4.8.4: {}
node-gyp@12.2.0:
@@ -23938,8 +23736,6 @@ snapshots:
semver@6.3.1: {}
semver@7.7.3: {}
semver@7.7.4: {}
send@0.19.2:
@@ -25379,9 +25175,9 @@ snapshots:
optionalDependencies:
vite: 7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest-fetch-mock@0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)):
vitest-fetch-mock@0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)):
dependencies:
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2):
dependencies:
@@ -25427,51 +25223,7 @@ snapshots:
- tsx
- yaml
vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2):
dependencies:
'@types/chai': 5.2.3
'@vitest/expect': 3.2.4
'@vitest/mocker': 3.2.4(vite@7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))
'@vitest/pretty-format': 3.2.4
'@vitest/runner': 3.2.4
'@vitest/snapshot': 3.2.4
'@vitest/spy': 3.2.4
'@vitest/utils': 3.2.4
chai: 5.3.3
debug: 4.4.3
expect-type: 1.3.0
magic-string: 0.30.21
pathe: 2.0.3
picomatch: 4.0.3
std-env: 3.10.0
tinybench: 2.9.0
tinyexec: 0.3.2
tinyglobby: 0.2.15
tinypool: 1.1.1
tinyrainbow: 2.0.0
vite: 7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
vite-node: 3.2.4(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)
why-is-node-running: 2.3.0
optionalDependencies:
'@types/debug': 4.1.12
'@types/node': 24.10.13
happy-dom: 20.5.0
jsdom: 26.1.0(canvas@2.11.2)
transitivePeerDependencies:
- jiti
- less
- lightningcss
- msw
- sass
- sass-embedded
- stylus
- sugarss
- supports-color
- terser
- tsx
- yaml
vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2):
vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2):
dependencies:
'@types/chai': 5.2.3
'@vitest/expect': 3.2.4
@@ -25500,7 +25252,7 @@ snapshots:
'@types/debug': 4.1.12
'@types/node': 25.2.3
happy-dom: 20.5.0
jsdom: 26.1.0(canvas@2.11.2)
jsdom: 26.1.0(canvas@2.11.2(encoding@0.1.13))
transitivePeerDependencies:
- jiti
- less

View File

@@ -35,7 +35,6 @@
},
"dependencies": {
"@extism/extism": "2.0.0-rc13",
"@nestjs/bullmq": "^11.0.1",
"@nestjs/common": "^11.0.4",
"@nestjs/core": "^11.0.4",
"@nestjs/platform-express": "^11.0.4",
@@ -62,7 +61,6 @@
"async-lock": "^1.4.0",
"bcrypt": "^6.0.0",
"body-parser": "^2.2.0",
"bullmq": "^5.51.0",
"chokidar": "^4.0.3",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",

View File

@@ -1,4 +1,3 @@
import { BullModule } from '@nestjs/bullmq';
import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule';
@@ -22,6 +21,7 @@ import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
import { repositories } from 'src/repositories';
import { AppRepository } from 'src/repositories/app.repository';
import { ConfigRepository } from 'src/repositories/config.repository';
import { JobRepository } from 'src/repositories/job.repository';
import { DatabaseRepository } from 'src/repositories/database.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
@@ -49,7 +49,7 @@ const commonMiddleware = [
const apiMiddleware = [FileUploadInterceptor, ...commonMiddleware, { provide: APP_GUARD, useClass: AuthGuard }];
const configRepository = new ConfigRepository();
const { bull, cls, database, otel } = configRepository.getEnv();
const { cls, database, otel } = configRepository.getEnv();
const commonImports = [
ClsModule.forRoot(cls.config),
@@ -57,7 +57,6 @@ const commonImports = [
OpenTelemetryModule.forRoot(otel),
];
const bullImports = [BullModule.forRoot(bull.config), BullModule.registerQueue(...bull.queues)];
export class BaseModule implements OnModuleInit, OnModuleDestroy {
constructor(
@@ -65,6 +64,7 @@ export class BaseModule implements OnModuleInit, OnModuleDestroy {
logger: LoggingRepository,
private authService: AuthService,
private eventRepository: EventRepository,
private jobRepository: JobRepository,
private queueService: QueueService,
private telemetryRepository: TelemetryRepository,
private websocketRepository: WebsocketRepository,
@@ -91,12 +91,13 @@ export class BaseModule implements OnModuleInit, OnModuleDestroy {
async onModuleDestroy() {
await this.eventRepository.emit('AppShutdown');
await this.jobRepository.onShutdown();
await teardownTelemetry();
}
}
@Module({
imports: [...bullImports, ...commonImports, ScheduleModule.forRoot()],
imports: [...commonImports, ScheduleModule.forRoot()],
controllers: [...controllers],
providers: [...common, ...apiMiddleware, { provide: IWorker, useValue: ImmichWorker.Api }],
})
@@ -137,13 +138,13 @@ export class MaintenanceModule {
}
@Module({
imports: [...bullImports, ...commonImports],
imports: [...commonImports],
providers: [...common, { provide: IWorker, useValue: ImmichWorker.Microservices }, SchedulerRegistry],
})
export class MicroservicesModule extends BaseModule {}
@Module({
imports: [...bullImports, ...commonImports],
imports: [...commonImports],
providers: [...common, ...commandsAndQuestions, SchedulerRegistry],
})
export class ImmichAdminModule implements OnModuleDestroy {

View File

@@ -569,6 +569,13 @@ export enum QueueName {
Editor = 'editor',
}
export const JobQueueStatus = {
Pending: 0,
Active: 1,
Failed: 2,
} as const;
export type JobQueueStatus = (typeof JobQueueStatus)[keyof typeof JobQueueStatus];
export enum QueueJobStatus {
Active = 'active',
Failed = 'failed',
@@ -658,6 +665,12 @@ export enum JobName {
WorkflowRun = 'WorkflowRun',
}
type JobNameValue = (typeof JobName)[keyof typeof JobName];
const names = Object.values(JobName);
export const JobCode = Object.fromEntries(names.map((key, i) => [key, i])) as Record<JobNameValue, number>;
export const JOB_CODE_TO_NAME = Object.fromEntries(names.map((key, i) => [i, key])) as Record<number, JobNameValue>;
export type JobCode = (typeof JobCode)[keyof typeof JobCode];
export enum QueueCommand {
Start = 'start',
/** @deprecated Use `updateQueue` instead */

View File

@@ -1,6 +1,4 @@
import { RegisterQueueOptions } from '@nestjs/bullmq';
import { Inject, Injectable, Optional } from '@nestjs/common';
import { QueueOptions } from 'bullmq';
import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator';
import { Request, Response } from 'express';
@@ -19,7 +17,6 @@ import {
ImmichWorker,
LogFormat,
LogLevel,
QueueName,
} from 'src/enum';
import { DatabaseConnectionParams, VectorExtension } from 'src/types';
import { setDifference } from 'src/utils/set';
@@ -48,11 +45,6 @@ export interface EnvData {
thirdPartySupportUrl?: string;
};
bull: {
config: QueueOptions;
queues: RegisterQueueOptions[];
};
cls: {
config: ClsModuleOptions;
};
@@ -253,19 +245,6 @@ const getEnv = (): EnvData => {
thirdPartySupportUrl: dto.IMMICH_THIRD_PARTY_SUPPORT_URL,
},
bull: {
config: {
prefix: 'immich_bull',
connection: { ...redisConfig },
defaultJobOptions: {
attempts: 1,
removeOnComplete: true,
removeOnFail: false,
},
},
queues: Object.values(QueueName).map((name) => ({ name })),
},
cls: {
config: {
middleware: {

View File

@@ -1,16 +1,30 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { ModuleRef, Reflector } from '@nestjs/core';
import { JobsOptions, Queue, Worker } from 'bullmq';
import { ClassConstructor } from 'class-transformer';
import { Kysely, sql } from 'kysely';
import { PostgresJSDialect } from 'kysely-postgres-js';
import { setTimeout } from 'node:timers/promises';
import postgres from 'postgres';
import { JobConfig } from 'src/decorators';
import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto';
import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum';
import {
JOB_CODE_TO_NAME,
JobCode,
JobName,
JobQueueStatus,
JobStatus,
MetadataKey,
QueueCleanType,
QueueJobStatus,
QueueName,
} from 'src/enum';
import { ConfigRepository } from 'src/repositories/config.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { JobCounts, JobItem, JobOf } from 'src/types';
import { DB } from 'src/schema';
import { ConcurrentQueueName, JobCounts, JobItem, JobOf } from 'src/types';
import { asPostgresConnectionConfig } from 'src/utils/database';
import { getTable, InsertRow, QueueWorker, WriteBuffer } from 'src/utils/job-queue.util';
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
type JobMapItem = {
@@ -20,10 +34,54 @@ type JobMapItem = {
label: string;
};
const SERIAL_QUEUES = [
QueueName.FacialRecognition,
QueueName.StorageTemplateMigration,
QueueName.DuplicateDetection,
QueueName.BackupDatabase,
];
export const isConcurrentQueue = (name: QueueName): name is ConcurrentQueueName => !SERIAL_QUEUES.includes(name);
const getClaimBatch = (queueName: QueueName): number => {
if (SERIAL_QUEUES.includes(queueName)) {
return 1;
}
switch (queueName) {
case QueueName.VideoConversion: {
return 1;
}
case QueueName.FaceDetection:
case QueueName.SmartSearch:
case QueueName.Ocr: {
return 2;
}
default: {
return 100; // will be clamped to slotsAvailable by the worker
}
}
};
const STATUS_FILTER = {
[QueueJobStatus.Active]: JobQueueStatus.Active,
[QueueJobStatus.Failed]: null as null, // failures are in a separate table
[QueueJobStatus.Waiting]: JobQueueStatus.Pending,
[QueueJobStatus.Complete]: null as null, // completed jobs are deleted
[QueueJobStatus.Delayed]: JobQueueStatus.Pending, // delayed = pending with future run_after
[QueueJobStatus.Paused]: JobQueueStatus.Pending, // paused queue has pending jobs
};
@Injectable()
export class JobRepository {
private workers: Partial<Record<QueueName, Worker>> = {};
private workers: Partial<Record<QueueName, QueueWorker>> = {};
private handlers: Partial<Record<JobName, JobMapItem>> = {};
private writeBuffer!: WriteBuffer;
private pool: postgres.Sql | null = null;
private db!: Kysely<DB>;
private listenConn: postgres.Sql | null = null;
private listenReady = false;
private pauseState: Partial<Record<QueueName, boolean>> = {};
constructor(
private moduleRef: ModuleRef,
@@ -82,21 +140,46 @@ export class JobRepository {
throw new ImmichStartupError(errorMessage);
}
}
this.pool = this.createPgConnection({ max: 20, connection: { synchronous_commit: 'off' } });
this.db = new Kysely<DB>({ dialect: new PostgresJSDialect({ postgres: this.pool }) });
this.writeBuffer = new WriteBuffer(
this.pool,
(queue) => this.notify(queue),
(error) => this.logger.error(`Failed to flush job write buffer: ${error}`),
);
}
startWorkers() {
const { bull } = this.configRepository.getEnv();
async startWorkers() {
// Startup sweep: reset any active jobs from a previous crash
await Promise.all(
Object.values(QueueName).map((queueName) =>
this.db
.updateTable(getTable(this.db, queueName))
.set({ status: JobQueueStatus.Pending, startedAt: null, expiresAt: null })
.where('status', '=', JobQueueStatus.Active)
.where('expiresAt', '<', sql<Date>`now()`) // needed for multi-instance safety
.execute(),
),
);
for (const queueName of Object.values(QueueName)) {
this.logger.debug(`Starting worker for queue: ${queueName}`);
this.workers[queueName] = new Worker(
this.workers[queueName] = new QueueWorker({
queueName,
(job) => this.eventRepository.emit('JobRun', queueName, job as JobItem),
{ ...bull.config, concurrency: 1 },
);
stallTimeout: 5 * 60 * 1000, // 5 min
claimBatch: getClaimBatch(queueName),
maxRetries: 5,
backoffBaseMs: 30_000,
concurrency: 1,
db: this.db,
onJob: (job) => this.eventRepository.emit('JobRun', queueName, job),
});
}
await this.setupListen();
}
async run({ name, data }: JobItem) {
run({ name, data }: JobItem) {
const item = this.handlers[name as JobName];
if (!item) {
this.logger.warn(`Skipping unknown job: "${name}"`);
@@ -113,84 +196,127 @@ export class JobRepository {
return;
}
worker.concurrency = concurrency;
worker.setConcurrency(concurrency);
}
async isActive(name: QueueName): Promise<boolean> {
const queue = this.getQueue(name);
const count = await queue.getActiveCount();
return count > 0;
const result = await this.db
.selectFrom(getTable(this.db, name))
.select('id')
.where('status', '=', JobQueueStatus.Active)
.limit(1)
.executeTakeFirst();
return result !== undefined;
}
async isPaused(name: QueueName): Promise<boolean> {
return this.getQueue(name).isPaused();
isPaused(name: QueueName): Promise<boolean> {
return Promise.resolve(this.pauseState[name] ?? false);
}
pause(name: QueueName) {
return this.getQueue(name).pause();
async pause(name: QueueName) {
this.pauseState[name] = true;
await this.db
.insertInto('job_queue_meta')
.values({ queueName: name, isPaused: true })
.onConflict((oc) => oc.column('queueName').doUpdateSet({ isPaused: true }))
.execute();
this.workers[name]?.pause();
await this.notify(name, 'pause');
}
resume(name: QueueName) {
return this.getQueue(name).resume();
async resume(name: QueueName) {
this.pauseState[name] = false;
await this.db
.insertInto('job_queue_meta')
.values({ queueName: name, isPaused: false })
.onConflict((oc) => oc.column('queueName').doUpdateSet({ isPaused: false }))
.execute();
this.workers[name]?.resume();
await this.notify(name, 'resume');
}
empty(name: QueueName) {
return this.getQueue(name).drain();
return this.db.deleteFrom(getTable(this.db, name)).where('status', '=', JobQueueStatus.Pending).execute();
}
clear(name: QueueName, type: QueueCleanType) {
return this.getQueue(name).clean(0, 1000, type);
clear(name: QueueName, _type: QueueCleanType) {
return this.db.deleteFrom('job_failures').where('queueName', '=', name).execute();
}
getJobCounts(name: QueueName): Promise<JobCounts> {
return this.getQueue(name).getJobCounts(
'active',
'completed',
'failed',
'delayed',
'waiting',
'paused',
) as unknown as Promise<JobCounts>;
async getJobCounts(name: QueueName): Promise<JobCounts> {
const [statusResult, failedResult] = await Promise.all([
this.db
.selectFrom(getTable(this.db, name))
.select((eb) => ['status', eb.fn.countAll<number>().as('count')])
.groupBy('status')
.execute(),
this.db
.selectFrom('job_failures')
.select((eb) => eb.fn.countAll<number>().as('count'))
.where('queueName', '=', name)
.executeTakeFirst(),
]);
const counts: JobCounts = {
active: 0,
completed: 0,
failed: Number(failedResult?.count ?? 0),
delayed: 0,
waiting: 0,
paused: 0,
};
for (const row of statusResult) {
switch (row.status) {
case JobQueueStatus.Pending: {
counts.waiting = Number(row.count);
break;
}
case JobQueueStatus.Active: {
counts.active = Number(row.count);
break;
}
}
}
if (this.pauseState[name]) {
counts.paused = counts.waiting;
counts.waiting = 0;
}
return counts;
}
private getQueueName(name: JobName) {
return (this.handlers[name] as JobMapItem).queueName;
}
async queueAll(items: JobItem[]): Promise<void> {
queueAll(items: JobItem[]): void {
if (items.length === 0) {
return;
}
const promises = [];
const itemsByQueue = {} as Record<string, (JobItem & { data: any; options: JobsOptions | undefined })[]>;
const bufferItems: { queue: QueueName; row: InsertRow }[] = [];
for (const item of items) {
const queueName = this.getQueueName(item.name);
const job = {
name: item.name,
data: item.data || {},
options: this.getJobOptions(item) || undefined,
} as JobItem & { data: any; options: JobsOptions | undefined };
if (job.options?.jobId) {
// need to use add() instead of addBulk() for jobId deduplication
promises.push(this.getQueue(queueName).add(item.name, item.data, job.options));
} else {
itemsByQueue[queueName] = itemsByQueue[queueName] || [];
itemsByQueue[queueName].push(job);
}
const options = this.getJobOptions(item);
bufferItems.push({
queue: queueName,
row: {
code: JobCode[item.name],
data: item.data ?? null,
priority: options?.priority ?? null,
dedupKey: options?.dedupKey ?? null,
runAfter: options?.delay ? new Date(Date.now() + options.delay) : null,
},
});
}
for (const [queueName, jobs] of Object.entries(itemsByQueue)) {
const queue = this.getQueue(queueName as QueueName);
promises.push(queue.addBulk(jobs));
}
await Promise.all(promises);
this.writeBuffer.add(bufferItems);
}
async queue(item: JobItem): Promise<void> {
return this.queueAll([item]);
queue(item: JobItem): void {
this.queueAll([item]);
}
async waitForQueueCompletion(...queues: QueueName[]): Promise<void> {
@@ -209,29 +335,76 @@ export class JobRepository {
}
async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise<QueueJobResponseDto[]> {
const jobs = await this.getQueue(name).getJobs(dto.status ?? Object.values(QueueJobStatus), 0, 1000);
return jobs.map((job) => {
const { id, name, timestamp, data } = job;
return { id, name: name as JobName, timestamp, data };
});
const requestedStatuses = dto.status ?? Object.values(QueueJobStatus);
const includeFailed = requestedStatuses.includes(QueueJobStatus.Failed);
const statuses: JobQueueStatus[] = [];
for (const status of requestedStatuses) {
const mapped = STATUS_FILTER[status];
if (mapped !== null && !statuses.includes(mapped)) {
statuses.push(mapped);
}
}
const results: QueueJobResponseDto[] = [];
if (statuses.length > 0) {
const rows = await this.db
.selectFrom(getTable(this.db, name))
.select(['id', 'code', 'data', 'runAfter'])
.where('status', 'in', statuses)
.orderBy('id', 'desc')
.limit(1000)
.execute();
for (const row of rows) {
results.push({
id: String(row.id),
name: JOB_CODE_TO_NAME[row.code],
data: (row.data ?? {}) as object,
timestamp: new Date(row.runAfter).getTime(),
});
}
}
if (includeFailed) {
const failedRows = await this.db
.selectFrom('job_failures')
.select(['id', 'code', 'data', 'failedAt'])
.where('queueName', '=', name)
.orderBy('id', 'desc')
.limit(1000)
.execute();
for (const row of failedRows) {
results.push({
id: `f-${row.id}`,
name: JOB_CODE_TO_NAME[row.code],
data: (row.data ?? {}) as object,
timestamp: new Date(row.failedAt).getTime(),
});
}
}
return results;
}
private getJobOptions(item: JobItem): JobsOptions | null {
private getJobOptions(item: JobItem): { dedupKey?: string; priority?: number; delay?: number } | null {
switch (item.name) {
case JobName.NotifyAlbumUpdate: {
return {
jobId: `${item.data.id}/${item.data.recipientId}`,
dedupKey: `${item.data.id}/${item.data.recipientId}`,
delay: item.data?.delay,
};
}
case JobName.StorageTemplateMigrationSingle: {
return { jobId: item.data.id };
return { dedupKey: item.data.id };
}
case JobName.PersonGenerateThumbnail: {
return { priority: 1 };
}
case JobName.FacialRecognitionQueueAll: {
return { jobId: JobName.FacialRecognitionQueueAll };
return { dedupKey: JobName.FacialRecognitionQueueAll };
}
default: {
return null;
@@ -239,16 +412,110 @@ export class JobRepository {
}
}
private getQueue(queue: QueueName): Queue {
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
private createPgConnection(options?: { max?: number; connection?: Record<string, string> }) {
const { database } = this.configRepository.getEnv();
const pgConfig = asPostgresConnectionConfig(database.config);
return postgres({
host: pgConfig.host,
port: pgConfig.port,
username: pgConfig.username,
password: pgConfig.password as string | undefined,
database: pgConfig.database,
ssl: pgConfig.ssl as boolean | undefined,
max: options?.max ?? 1,
connection: options?.connection,
});
}
/** @deprecated */
// todo: remove this when asset notifications no longer need it.
public async removeJob(name: JobName, jobID: string): Promise<void> {
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID);
if (existingJob) {
await existingJob.remove();
private async setupListen(): Promise<void> {
if (this.listenConn) {
await this.listenConn.end();
this.listenConn = null;
}
this.listenConn = this.createPgConnection();
for (const queueName of Object.values(QueueName)) {
await this.listenConn.listen(
`jobs:${queueName}`,
(payload) => this.onNotify(queueName, payload),
() => this.onReconnect(),
);
}
this.listenReady = true;
await this.syncPauseState();
for (const worker of Object.values(this.workers)) {
worker.onNotification();
}
}
private onNotify(queueName: QueueName, payload: string) {
switch (payload) {
case 'pause': {
this.pauseState[queueName] = true;
this.workers[queueName]?.pause();
break;
}
case 'resume': {
this.pauseState[queueName] = false;
this.workers[queueName]?.resume();
break;
}
default: {
this.workers[queueName]?.onNotification();
break;
}
}
}
private onReconnect() {
if (!this.listenReady) {
return;
}
this.listenReady = false;
this.logger.log('LISTEN connection re-established, syncing state');
void this.syncPauseState().then(() => {
for (const worker of Object.values(this.workers)) {
worker.onNotification();
}
this.listenReady = true;
});
}
private async syncPauseState(): Promise<void> {
const metaRows = await this.db.selectFrom('job_queue_meta').selectAll().execute();
for (const row of metaRows) {
const queueName = row.queueName as QueueName;
const wasPaused = this.pauseState[queueName] ?? false;
this.pauseState[queueName] = row.isPaused;
if (wasPaused && !row.isPaused) {
this.workers[queueName]?.resume();
} else if (!wasPaused && row.isPaused) {
this.workers[queueName]?.pause();
}
}
}
private notify(queue: QueueName, payload = '') {
return sql`SELECT pg_notify(${`jobs:${queue}`}, ${payload})`.execute(this.db);
}
async onShutdown(): Promise<void> {
const shutdownPromises = Object.values(this.workers).map((worker) => worker.shutdown());
await Promise.all(shutdownPromises);
if (this.writeBuffer) {
await this.writeBuffer.flush();
}
if (this.pool) {
await this.pool.end();
this.pool = null;
}
if (this.listenConn) {
await this.listenConn.end();
this.listenConn = null;
}
}
}

View File

@@ -41,6 +41,28 @@ import { AssetTable } from 'src/schema/tables/asset.table';
import { AuditTable } from 'src/schema/tables/audit.table';
import { FaceSearchTable } from 'src/schema/tables/face-search.table';
import { GeodataPlacesTable } from 'src/schema/tables/geodata-places.table';
import {
JobFailuresTable,
JobQueueMetaTable,
JobsBackgroundTaskTable,
JobsBackupDatabaseTable,
JobsDuplicateDetectionTable,
JobsEditorTable,
JobsFaceDetectionTable,
JobsFacialRecognitionTable,
JobsLibraryTable,
JobsMetadataExtractionTable,
JobsMigrationTable,
JobsNotificationTable,
JobsOcrTable,
JobsSearchTable,
JobsSidecarTable,
JobsSmartSearchTable,
JobsStorageTemplateMigrationTable,
JobsThumbnailGenerationTable,
JobsVideoConversionTable,
JobsWorkflowTable,
} from 'src/schema/tables/job.table';
import { LibraryTable } from 'src/schema/tables/library.table';
import { MemoryAssetAuditTable } from 'src/schema/tables/memory-asset-audit.table';
import { MemoryAssetTable } from 'src/schema/tables/memory-asset.table';
@@ -135,6 +157,26 @@ export class ImmichDatabase {
WorkflowTable,
WorkflowFilterTable,
WorkflowActionTable,
JobsThumbnailGenerationTable,
JobsMetadataExtractionTable,
JobsVideoConversionTable,
JobsFaceDetectionTable,
JobsFacialRecognitionTable,
JobsSmartSearchTable,
JobsDuplicateDetectionTable,
JobsBackgroundTaskTable,
JobsStorageTemplateMigrationTable,
JobsMigrationTable,
JobsSearchTable,
JobsSidecarTable,
JobsLibraryTable,
JobsNotificationTable,
JobsBackupDatabaseTable,
JobsOcrTable,
JobsWorkflowTable,
JobsEditorTable,
JobQueueMetaTable,
JobFailuresTable,
];
functions = [
@@ -252,4 +294,25 @@ export interface DB {
workflow: WorkflowTable;
workflow_filter: WorkflowFilterTable;
workflow_action: WorkflowActionTable;
jobs_thumbnail_generation: JobsThumbnailGenerationTable;
jobs_metadata_extraction: JobsMetadataExtractionTable;
jobs_video_conversion: JobsVideoConversionTable;
jobs_face_detection: JobsFaceDetectionTable;
jobs_facial_recognition: JobsFacialRecognitionTable;
jobs_smart_search: JobsSmartSearchTable;
jobs_duplicate_detection: JobsDuplicateDetectionTable;
jobs_background_task: JobsBackgroundTaskTable;
jobs_storage_template_migration: JobsStorageTemplateMigrationTable;
jobs_migration: JobsMigrationTable;
jobs_search: JobsSearchTable;
jobs_sidecar: JobsSidecarTable;
jobs_library: JobsLibraryTable;
jobs_notification: JobsNotificationTable;
jobs_backup_database: JobsBackupDatabaseTable;
jobs_ocr: JobsOcrTable;
jobs_workflow: JobsWorkflowTable;
jobs_editor: JobsEditorTable;
job_queue_meta: JobQueueMetaTable;
job_failures: JobFailuresTable;
}

View File

@@ -0,0 +1,492 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`CREATE TABLE "jobs_thumbnail_generation" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_thumbnail_generation_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_thumbnail_generation_dedup" ON "jobs_thumbnail_generation" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_thumbnail_generation_pending" ON "jobs_thumbnail_generation" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_metadata_extraction" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_metadata_extraction_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_metadata_extraction_dedup" ON "jobs_metadata_extraction" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_metadata_extraction_pending" ON "jobs_metadata_extraction" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_video_conversion" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_video_conversion_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_video_conversion_dedup" ON "jobs_video_conversion" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_video_conversion_pending" ON "jobs_video_conversion" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_face_detection" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_face_detection_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_face_detection_dedup" ON "jobs_face_detection" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_face_detection_pending" ON "jobs_face_detection" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_facial_recognition" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_facial_recognition_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_facial_recognition_dedup" ON "jobs_facial_recognition" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_facial_recognition_pending" ON "jobs_facial_recognition" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_smart_search" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_smart_search_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_smart_search_dedup" ON "jobs_smart_search" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_smart_search_pending" ON "jobs_smart_search" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_duplicate_detection" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_duplicate_detection_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_duplicate_detection_dedup" ON "jobs_duplicate_detection" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_duplicate_detection_pending" ON "jobs_duplicate_detection" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_background_task" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_background_task_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_background_task_dedup" ON "jobs_background_task" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_background_task_pending" ON "jobs_background_task" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_storage_template_migration" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_storage_template_migration_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_storage_template_migration_dedup" ON "jobs_storage_template_migration" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_storage_template_migration_pending" ON "jobs_storage_template_migration" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_migration" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_migration_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_migration_dedup" ON "jobs_migration" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_migration_pending" ON "jobs_migration" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_search" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_search_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_search_dedup" ON "jobs_search" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_search_pending" ON "jobs_search" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_sidecar" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_sidecar_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_sidecar_dedup" ON "jobs_sidecar" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_sidecar_pending" ON "jobs_sidecar" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_library" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_library_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_library_dedup" ON "jobs_library" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_library_pending" ON "jobs_library" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_notification" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_notification_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_notification_dedup" ON "jobs_notification" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_notification_pending" ON "jobs_notification" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_backup_database" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_backup_database_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_backup_database_dedup" ON "jobs_backup_database" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_backup_database_pending" ON "jobs_backup_database" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_ocr" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_ocr_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_ocr_dedup" ON "jobs_ocr" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_ocr_pending" ON "jobs_ocr" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_workflow" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_workflow_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_workflow_dedup" ON "jobs_workflow" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_workflow_pending" ON "jobs_workflow" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "jobs_editor" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"runAfter" timestamp with time zone NOT NULL DEFAULT now(),
"startedAt" timestamp with time zone,
"expiresAt" timestamp with time zone,
"code" smallint NOT NULL,
"priority" smallint NOT NULL DEFAULT 0,
"status" smallint NOT NULL DEFAULT 0,
"retries" smallint NOT NULL DEFAULT 0,
"data" jsonb,
"dedupKey" text,
CONSTRAINT "jobs_editor_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE UNIQUE INDEX "IDX_jobs_editor_dedup" ON "jobs_editor" ("dedupKey") WHERE "dedupKey" IS NOT NULL;`.execute(db);
await sql`CREATE INDEX "IDX_jobs_editor_pending" ON "jobs_editor" (priority DESC, id ASC);`.execute(db);
await sql`CREATE TABLE "job_queue_meta" (
"queueName" text NOT NULL,
"isPaused" boolean NOT NULL DEFAULT false,
CONSTRAINT "job_queue_meta_pkey" PRIMARY KEY ("queueName")
);`.execute(db);
await sql`CREATE TABLE "job_failures" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"failedAt" timestamp with time zone NOT NULL DEFAULT now(),
"queueName" text NOT NULL,
"code" smallint NOT NULL,
"data" jsonb,
"error" text,
CONSTRAINT "job_failures_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE INDEX "IDX_job_failures_queue" ON "job_failures" ("queueName");`.execute(db);
await sql`ALTER TABLE "jobs_thumbnail_generation" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_thumbnail_generation" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_thumbnail_generation" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_metadata_extraction" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_metadata_extraction" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_metadata_extraction" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_video_conversion" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_video_conversion" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_video_conversion" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_face_detection" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_face_detection" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_face_detection" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_facial_recognition" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_facial_recognition" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_facial_recognition" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_smart_search" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_smart_search" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_smart_search" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_duplicate_detection" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_duplicate_detection" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_duplicate_detection" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_background_task" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_background_task" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_background_task" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_storage_template_migration" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_storage_template_migration" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_storage_template_migration" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_migration" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_migration" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_migration" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_search" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_search" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_search" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_sidecar" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_sidecar" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_sidecar" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_library" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_library" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_library" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_notification" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_notification" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_notification" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_backup_database" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_backup_database" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_backup_database" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_ocr" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_ocr" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_ocr" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_workflow" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_workflow" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_workflow" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`ALTER TABLE "jobs_editor" SET (autovacuum_vacuum_cost_delay = 0)`.execute(db);
await sql`ALTER TABLE "jobs_editor" SET (autovacuum_vacuum_scale_factor = 0.01)`.execute(db);
await sql`ALTER TABLE "jobs_editor" SET (autovacuum_vacuum_threshold = 100)`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_thumbnail_generation_dedup', '{"type":"index","name":"IDX_jobs_thumbnail_generation_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_thumbnail_generation_dedup\\" ON \\"jobs_thumbnail_generation\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_thumbnail_generation_pending', '{"type":"index","name":"IDX_jobs_thumbnail_generation_pending","sql":"CREATE INDEX \\"IDX_jobs_thumbnail_generation_pending\\" ON \\"jobs_thumbnail_generation\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_metadata_extraction_dedup', '{"type":"index","name":"IDX_jobs_metadata_extraction_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_metadata_extraction_dedup\\" ON \\"jobs_metadata_extraction\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_metadata_extraction_pending', '{"type":"index","name":"IDX_jobs_metadata_extraction_pending","sql":"CREATE INDEX \\"IDX_jobs_metadata_extraction_pending\\" ON \\"jobs_metadata_extraction\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_video_conversion_dedup', '{"type":"index","name":"IDX_jobs_video_conversion_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_video_conversion_dedup\\" ON \\"jobs_video_conversion\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_video_conversion_pending', '{"type":"index","name":"IDX_jobs_video_conversion_pending","sql":"CREATE INDEX \\"IDX_jobs_video_conversion_pending\\" ON \\"jobs_video_conversion\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_face_detection_dedup', '{"type":"index","name":"IDX_jobs_face_detection_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_face_detection_dedup\\" ON \\"jobs_face_detection\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_face_detection_pending', '{"type":"index","name":"IDX_jobs_face_detection_pending","sql":"CREATE INDEX \\"IDX_jobs_face_detection_pending\\" ON \\"jobs_face_detection\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_facial_recognition_dedup', '{"type":"index","name":"IDX_jobs_facial_recognition_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_facial_recognition_dedup\\" ON \\"jobs_facial_recognition\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_facial_recognition_pending', '{"type":"index","name":"IDX_jobs_facial_recognition_pending","sql":"CREATE INDEX \\"IDX_jobs_facial_recognition_pending\\" ON \\"jobs_facial_recognition\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_smart_search_dedup', '{"type":"index","name":"IDX_jobs_smart_search_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_smart_search_dedup\\" ON \\"jobs_smart_search\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_smart_search_pending', '{"type":"index","name":"IDX_jobs_smart_search_pending","sql":"CREATE INDEX \\"IDX_jobs_smart_search_pending\\" ON \\"jobs_smart_search\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_duplicate_detection_dedup', '{"type":"index","name":"IDX_jobs_duplicate_detection_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_duplicate_detection_dedup\\" ON \\"jobs_duplicate_detection\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_duplicate_detection_pending', '{"type":"index","name":"IDX_jobs_duplicate_detection_pending","sql":"CREATE INDEX \\"IDX_jobs_duplicate_detection_pending\\" ON \\"jobs_duplicate_detection\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_background_task_dedup', '{"type":"index","name":"IDX_jobs_background_task_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_background_task_dedup\\" ON \\"jobs_background_task\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_background_task_pending', '{"type":"index","name":"IDX_jobs_background_task_pending","sql":"CREATE INDEX \\"IDX_jobs_background_task_pending\\" ON \\"jobs_background_task\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_storage_template_migration_dedup', '{"type":"index","name":"IDX_jobs_storage_template_migration_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_storage_template_migration_dedup\\" ON \\"jobs_storage_template_migration\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_storage_template_migration_pending', '{"type":"index","name":"IDX_jobs_storage_template_migration_pending","sql":"CREATE INDEX \\"IDX_jobs_storage_template_migration_pending\\" ON \\"jobs_storage_template_migration\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_migration_dedup', '{"type":"index","name":"IDX_jobs_migration_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_migration_dedup\\" ON \\"jobs_migration\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_migration_pending', '{"type":"index","name":"IDX_jobs_migration_pending","sql":"CREATE INDEX \\"IDX_jobs_migration_pending\\" ON \\"jobs_migration\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_search_dedup', '{"type":"index","name":"IDX_jobs_search_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_search_dedup\\" ON \\"jobs_search\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_search_pending', '{"type":"index","name":"IDX_jobs_search_pending","sql":"CREATE INDEX \\"IDX_jobs_search_pending\\" ON \\"jobs_search\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_sidecar_dedup', '{"type":"index","name":"IDX_jobs_sidecar_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_sidecar_dedup\\" ON \\"jobs_sidecar\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_sidecar_pending', '{"type":"index","name":"IDX_jobs_sidecar_pending","sql":"CREATE INDEX \\"IDX_jobs_sidecar_pending\\" ON \\"jobs_sidecar\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_library_dedup', '{"type":"index","name":"IDX_jobs_library_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_library_dedup\\" ON \\"jobs_library\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_library_pending', '{"type":"index","name":"IDX_jobs_library_pending","sql":"CREATE INDEX \\"IDX_jobs_library_pending\\" ON \\"jobs_library\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_notification_dedup', '{"type":"index","name":"IDX_jobs_notification_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_notification_dedup\\" ON \\"jobs_notification\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_notification_pending', '{"type":"index","name":"IDX_jobs_notification_pending","sql":"CREATE INDEX \\"IDX_jobs_notification_pending\\" ON \\"jobs_notification\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_backup_database_dedup', '{"type":"index","name":"IDX_jobs_backup_database_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_backup_database_dedup\\" ON \\"jobs_backup_database\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_backup_database_pending', '{"type":"index","name":"IDX_jobs_backup_database_pending","sql":"CREATE INDEX \\"IDX_jobs_backup_database_pending\\" ON \\"jobs_backup_database\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_ocr_dedup', '{"type":"index","name":"IDX_jobs_ocr_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_ocr_dedup\\" ON \\"jobs_ocr\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_ocr_pending', '{"type":"index","name":"IDX_jobs_ocr_pending","sql":"CREATE INDEX \\"IDX_jobs_ocr_pending\\" ON \\"jobs_ocr\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_workflow_dedup', '{"type":"index","name":"IDX_jobs_workflow_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_workflow_dedup\\" ON \\"jobs_workflow\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_workflow_pending', '{"type":"index","name":"IDX_jobs_workflow_pending","sql":"CREATE INDEX \\"IDX_jobs_workflow_pending\\" ON \\"jobs_workflow\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_editor_dedup', '{"type":"index","name":"IDX_jobs_editor_dedup","sql":"CREATE UNIQUE INDEX \\"IDX_jobs_editor_dedup\\" ON \\"jobs_editor\\" (\\"dedupKey\\") WHERE \\"dedupKey\\" IS NOT NULL;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('index_IDX_jobs_editor_pending', '{"type":"index","name":"IDX_jobs_editor_pending","sql":"CREATE INDEX \\"IDX_jobs_editor_pending\\" ON \\"jobs_editor\\" (priority DESC, id ASC);"}'::jsonb);`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`ALTER TABLE "jobs_thumbnail_generation" RESET (jobs_thumbnail_generation.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_thumbnail_generation" RESET (jobs_thumbnail_generation.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_thumbnail_generation" RESET (jobs_thumbnail_generation.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_metadata_extraction" RESET (jobs_metadata_extraction.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_metadata_extraction" RESET (jobs_metadata_extraction.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_metadata_extraction" RESET (jobs_metadata_extraction.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_video_conversion" RESET (jobs_video_conversion.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_video_conversion" RESET (jobs_video_conversion.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_video_conversion" RESET (jobs_video_conversion.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_face_detection" RESET (jobs_face_detection.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_face_detection" RESET (jobs_face_detection.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_face_detection" RESET (jobs_face_detection.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_facial_recognition" RESET (jobs_facial_recognition.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_facial_recognition" RESET (jobs_facial_recognition.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_facial_recognition" RESET (jobs_facial_recognition.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_smart_search" RESET (jobs_smart_search.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_smart_search" RESET (jobs_smart_search.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_smart_search" RESET (jobs_smart_search.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_duplicate_detection" RESET (jobs_duplicate_detection.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_duplicate_detection" RESET (jobs_duplicate_detection.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_duplicate_detection" RESET (jobs_duplicate_detection.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_background_task" RESET (jobs_background_task.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_background_task" RESET (jobs_background_task.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_background_task" RESET (jobs_background_task.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_storage_template_migration" RESET (jobs_storage_template_migration.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_storage_template_migration" RESET (jobs_storage_template_migration.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_storage_template_migration" RESET (jobs_storage_template_migration.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_migration" RESET (jobs_migration.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_migration" RESET (jobs_migration.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_migration" RESET (jobs_migration.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_search" RESET (jobs_search.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_search" RESET (jobs_search.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_search" RESET (jobs_search.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_sidecar" RESET (jobs_sidecar.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_sidecar" RESET (jobs_sidecar.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_sidecar" RESET (jobs_sidecar.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_library" RESET (jobs_library.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_library" RESET (jobs_library.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_library" RESET (jobs_library.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_notification" RESET (jobs_notification.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_notification" RESET (jobs_notification.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_notification" RESET (jobs_notification.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_backup_database" RESET (jobs_backup_database.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_backup_database" RESET (jobs_backup_database.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_backup_database" RESET (jobs_backup_database.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_ocr" RESET (jobs_ocr.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_ocr" RESET (jobs_ocr.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_ocr" RESET (jobs_ocr.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_workflow" RESET (jobs_workflow.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_workflow" RESET (jobs_workflow.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_workflow" RESET (jobs_workflow.autovacuum_vacuum_threshold)`.execute(db);
await sql`ALTER TABLE "jobs_editor" RESET (jobs_editor.autovacuum_vacuum_cost_delay)`.execute(db);
await sql`ALTER TABLE "jobs_editor" RESET (jobs_editor.autovacuum_vacuum_scale_factor)`.execute(db);
await sql`ALTER TABLE "jobs_editor" RESET (jobs_editor.autovacuum_vacuum_threshold)`.execute(db);
await sql`DROP TABLE "jobs_thumbnail_generation";`.execute(db);
await sql`DROP TABLE "jobs_metadata_extraction";`.execute(db);
await sql`DROP TABLE "jobs_video_conversion";`.execute(db);
await sql`DROP TABLE "jobs_face_detection";`.execute(db);
await sql`DROP TABLE "jobs_facial_recognition";`.execute(db);
await sql`DROP TABLE "jobs_smart_search";`.execute(db);
await sql`DROP TABLE "jobs_duplicate_detection";`.execute(db);
await sql`DROP TABLE "jobs_background_task";`.execute(db);
await sql`DROP TABLE "jobs_storage_template_migration";`.execute(db);
await sql`DROP TABLE "jobs_migration";`.execute(db);
await sql`DROP TABLE "jobs_search";`.execute(db);
await sql`DROP TABLE "jobs_sidecar";`.execute(db);
await sql`DROP TABLE "jobs_library";`.execute(db);
await sql`DROP TABLE "jobs_notification";`.execute(db);
await sql`DROP TABLE "jobs_backup_database";`.execute(db);
await sql`DROP TABLE "jobs_ocr";`.execute(db);
await sql`DROP TABLE "jobs_workflow";`.execute(db);
await sql`DROP TABLE "jobs_editor";`.execute(db);
await sql`DROP TABLE "job_queue_meta";`.execute(db);
await sql`DROP TABLE "job_failures";`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_thumbnail_generation_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_thumbnail_generation_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_metadata_extraction_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_metadata_extraction_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_video_conversion_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_video_conversion_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_face_detection_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_face_detection_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_facial_recognition_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_facial_recognition_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_smart_search_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_smart_search_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_duplicate_detection_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_duplicate_detection_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_background_task_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_background_task_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_storage_template_migration_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_storage_template_migration_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_migration_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_migration_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_search_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_search_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_sidecar_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_sidecar_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_library_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_library_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_notification_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_notification_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_backup_database_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_backup_database_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_ocr_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_ocr_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_workflow_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_workflow_pending';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_editor_dedup';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'index_IDX_jobs_editor_pending';`.execute(db);
}

View File

@@ -0,0 +1,145 @@
import { JobCode, JobQueueStatus, QueueName } from 'src/enum';
import { Column, ConfigurationParameter, Generated, Index, PrimaryColumn, Table } from 'src/sql-tools';
export type JobTable = {
id: Generated<number>;
runAfter: Generated<Date>;
startedAt: Date | null;
expiresAt: Date | null;
code: JobCode;
priority: Generated<number>;
status: Generated<JobQueueStatus>;
retries: Generated<number>;
data: unknown;
dedupKey: string | null;
};
export type JobFailureTable = {
id: Generated<number>;
failedAt: Generated<Date>;
queueName: string;
code: JobCode;
data: unknown;
error: string | null;
};
function defineJobTable(name: string) {
class JobTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
runAfter!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
startedAt!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expiresAt!: Date | null;
@Column({ type: 'smallint' })
code!: JobCode;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: 'smallint', default: 0 })
status!: Generated<JobQueueStatus>;
@Column({ type: 'smallint', default: 0 })
retries!: Generated<number>;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'text', nullable: true })
dedupKey!: string | null;
}
const decorated = [
ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }),
ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }),
ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }),
Index({
name: `IDX_${name}_dedup`,
columns: ['dedupKey'],
unique: true,
where: `"dedupKey" IS NOT NULL`,
}),
Index({ name: `IDX_${name}_pending`, expression: 'priority DESC, id ASC' }),
Table(name),
].reduce((cls, dec) => dec(cls) || cls, JobTable);
Object.defineProperty(decorated, 'name', { value: name });
return decorated;
}
export const JobsThumbnailGenerationTable = defineJobTable('jobs_thumbnail_generation');
export const JobsMetadataExtractionTable = defineJobTable('jobs_metadata_extraction');
export const JobsVideoConversionTable = defineJobTable('jobs_video_conversion');
export const JobsFaceDetectionTable = defineJobTable('jobs_face_detection');
export const JobsFacialRecognitionTable = defineJobTable('jobs_facial_recognition');
export const JobsSmartSearchTable = defineJobTable('jobs_smart_search');
export const JobsDuplicateDetectionTable = defineJobTable('jobs_duplicate_detection');
export const JobsBackgroundTaskTable = defineJobTable('jobs_background_task');
export const JobsStorageTemplateMigrationTable = defineJobTable('jobs_storage_template_migration');
export const JobsMigrationTable = defineJobTable('jobs_migration');
export const JobsSearchTable = defineJobTable('jobs_search');
export const JobsSidecarTable = defineJobTable('jobs_sidecar');
export const JobsLibraryTable = defineJobTable('jobs_library');
export const JobsNotificationTable = defineJobTable('jobs_notification');
export const JobsBackupDatabaseTable = defineJobTable('jobs_backup_database');
export const JobsOcrTable = defineJobTable('jobs_ocr');
export const JobsWorkflowTable = defineJobTable('jobs_workflow');
export const JobsEditorTable = defineJobTable('jobs_editor');
export type JobsThumbnailGenerationTable = InstanceType<typeof JobsThumbnailGenerationTable>;
export type JobsMetadataExtractionTable = InstanceType<typeof JobsMetadataExtractionTable>;
export type JobsVideoConversionTable = InstanceType<typeof JobsVideoConversionTable>;
export type JobsFaceDetectionTable = InstanceType<typeof JobsFaceDetectionTable>;
export type JobsFacialRecognitionTable = InstanceType<typeof JobsFacialRecognitionTable>;
export type JobsSmartSearchTable = InstanceType<typeof JobsSmartSearchTable>;
export type JobsDuplicateDetectionTable = InstanceType<typeof JobsDuplicateDetectionTable>;
export type JobsBackgroundTaskTable = InstanceType<typeof JobsBackgroundTaskTable>;
export type JobsStorageTemplateMigrationTable = InstanceType<typeof JobsStorageTemplateMigrationTable>;
export type JobsMigrationTable = InstanceType<typeof JobsMigrationTable>;
export type JobsSearchTable = InstanceType<typeof JobsSearchTable>;
export type JobsSidecarTable = InstanceType<typeof JobsSidecarTable>;
export type JobsLibraryTable = InstanceType<typeof JobsLibraryTable>;
export type JobsNotificationTable = InstanceType<typeof JobsNotificationTable>;
export type JobsBackupDatabaseTable = InstanceType<typeof JobsBackupDatabaseTable>;
export type JobsOcrTable = InstanceType<typeof JobsOcrTable>;
export type JobsWorkflowTable = InstanceType<typeof JobsWorkflowTable>;
export type JobsEditorTable = InstanceType<typeof JobsEditorTable>;
// Queue metadata table
@Table('job_queue_meta')
export class JobQueueMetaTable {
@PrimaryColumn({ type: 'text' })
queueName!: string;
@Column({ type: 'boolean', default: false })
isPaused!: Generated<boolean>;
}
// Dead-letter table for permanently failed jobs
@Table('job_failures')
@Index({ name: 'IDX_job_failures_queue', columns: ['queueName'] })
export class JobFailuresTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
failedAt!: Generated<Date>;
@Column({ type: 'text' })
queueName!: QueueName;
@Column({ type: 'smallint' })
code!: JobCode;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'text', nullable: true })
error!: string | null;
}

View File

@@ -29,7 +29,6 @@ import {
UnsupportedPostgresError,
} from 'src/utils/database-backups';
import { ImmichFileResponse } from 'src/utils/file';
import { handlePromiseError } from 'src/utils/misc';
@Injectable()
export class DatabaseBackupService {
@@ -68,7 +67,7 @@ export class DatabaseBackupService {
this.cronRepository.create({
name: 'backupDatabase',
expression: database.cronExpression,
onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.DatabaseBackup }), this.logger),
onTick: () => this.jobRepository.queue({ name: JobName.DatabaseBackup }),
start: database.enabled,
});
}

View File

@@ -53,7 +53,7 @@ export class JobService extends BaseService {
const response = await this.jobRepository.run(job);
await this.eventRepository.emit('JobSuccess', { job, response });
if (response && typeof response === 'string' && [JobStatus.Success, JobStatus.Skipped].includes(response)) {
await this.onDone(job);
void this.onDone(job).catch((error) => this.logger.error(`Failed to queue follow-up for ${job.name}: ${error}`));
}
} catch (error: Error | any) {
await this.eventRepository.emit('JobError', { job, error });

View File

@@ -47,7 +47,7 @@ export class LibraryService extends BaseService {
this.cronRepository.create({
name: CronJob.LibraryScan,
expression: scan.cronExpression,
onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.LibraryScanQueueAll }), this.logger),
onTick: () => this.jobRepository.queue({ name: JobName.LibraryScanQueueAll }),
start: scan.enabled,
});
}

View File

@@ -506,7 +506,6 @@ describe(NotificationService.name, () => {
it('should add new recipients for new images if job is already queued', async () => {
await sut.onAlbumUpdate({ id: '1', recipientId: '2' } as INotifyAlbumUpdateJob);
expect(mocks.job.removeJob).toHaveBeenCalledWith(JobName.NotifyAlbumUpdate, '1/2');
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.NotifyAlbumUpdate,
data: {

View File

@@ -218,7 +218,6 @@ export class NotificationService extends BaseService {
@OnEvent({ name: 'AlbumUpdate' })
async onAlbumUpdate({ id, recipientId }: ArgOf<'AlbumUpdate'>) {
await this.jobRepository.removeJob(JobName.NotifyAlbumUpdate, `${id}/${recipientId}`);
await this.jobRepository.queue({
name: JobName.NotifyAlbumUpdate,
data: { id, recipientId, delay: NotificationService.albumUpdateEmailDelayMs },

View File

@@ -28,8 +28,9 @@ import {
QueueName,
} from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { isConcurrentQueue } from 'src/repositories/job.repository';
import { BaseService } from 'src/services/base.service';
import { ConcurrentQueueName, JobItem } from 'src/types';
import { JobItem } from 'src/types';
import { handlePromiseError } from 'src/utils/misc';
const asNightlyTasksCron = (config: SystemConfig) => {
@@ -80,7 +81,7 @@ export class QueueService extends BaseService {
onBootstrap() {
this.jobRepository.setup(this.services);
if (this.worker === ImmichWorker.Microservices) {
this.jobRepository.startWorkers();
void this.jobRepository.startWorkers();
}
}
@@ -88,7 +89,7 @@ export class QueueService extends BaseService {
this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
let concurrency = 1;
if (this.isConcurrentQueue(queueName)) {
if (isConcurrentQueue(queueName)) {
concurrency = config.job[queueName].concurrency;
}
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
@@ -250,15 +251,6 @@ export class QueueService extends BaseService {
}
}
private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName {
return ![
QueueName.FacialRecognition,
QueueName.StorageTemplateMigration,
QueueName.DuplicateDetection,
QueueName.BackupDatabase,
].includes(name);
}
async handleNightlyJobs() {
const config = await this.getConfig({ withCache: false });
const jobs: JobItem[] = [];

View File

@@ -134,8 +134,8 @@ export class StorageTemplateService extends BaseService {
}
@OnEvent({ name: 'AssetMetadataExtracted' })
async onAssetMetadataExtracted({ source, assetId }: ArgOf<'AssetMetadataExtracted'>) {
await this.jobRepository.queue({ name: JobName.StorageTemplateMigrationSingle, data: { source, id: assetId } });
onAssetMetadataExtracted({ source, assetId }: ArgOf<'AssetMetadataExtracted'>) {
this.jobRepository.queue({ name: JobName.StorageTemplateMigrationSingle, data: { source, id: assetId } });
}
@OnJob({ name: JobName.StorageTemplateMigrationSingle, queue: QueueName.StorageTemplateMigration })

View File

@@ -12,6 +12,7 @@ export const compareParameters = (): Comparer<DatabaseParameter> => ({
{
type: 'ParameterReset',
databaseName: target.databaseName,
tableName: target.tableName,
parameterName: target.name,
reason: Reason.MissingInSource,
},

View File

@@ -3,11 +3,22 @@ import { Processor } from 'src/sql-tools/types';
export const processConfigurationParameters: Processor = (ctx, items) => {
for (const {
item: { options },
item: { object, options },
} of items.filter((item) => item.type === 'configurationParameter')) {
let tableName: string | undefined;
if (options.scope === 'table') {
const table = ctx.getTableByObject(object);
if (!table) {
ctx.warn('@ConfigurationParameter', `Unable to find table for table-scoped parameter "${options.name}"`);
continue;
}
tableName = table.name;
}
ctx.parameters.push({
databaseName: ctx.databaseName,
name: options.name,
tableName,
name: tableName ? `${tableName}.${options.name}` : options.name,
value: fromColumnValue(options.value),
scope: options.scope,
synchronize: options.synchronize ?? true,

View File

@@ -17,11 +17,11 @@ import { Processor } from 'src/sql-tools/types';
export const processors: Processor[] = [
processDatabases,
processConfigurationParameters,
processEnums,
processExtensions,
processFunctions,
processTables,
processConfigurationParameters,
processColumns,
processForeignKeyColumns,
processForeignKeyConstraints,

View File

@@ -17,4 +17,36 @@ export const readParameters: Reader = async (ctx, db) => {
synchronize: true,
});
}
// Read table-scoped storage parameters from pg_class.reloptions
const tableParams = await db
.selectFrom('pg_class')
.innerJoin('pg_namespace', 'pg_namespace.oid', 'pg_class.relnamespace')
.where('pg_namespace.nspname', '=', ctx.schemaName)
.where('pg_class.relkind', '=', sql.lit('r'))
.where('pg_class.reloptions', 'is not', null)
.select(['pg_class.relname as table_name', 'pg_class.reloptions'])
.execute();
for (const row of tableParams) {
if (!row.reloptions) {
continue;
}
for (const option of row.reloptions) {
const eqIdx = option.indexOf('=');
if (eqIdx === -1) {
continue;
}
const name = option.slice(0, eqIdx);
const value = option.slice(eqIdx + 1);
ctx.parameters.push({
name: `${row.table_name}.${name}`,
tableName: row.table_name,
value,
databaseName: ctx.databaseName,
scope: 'table',
synchronize: true,
});
}
}
};

View File

@@ -73,13 +73,13 @@ export const schemaDiff = (source: DatabaseSchema, target: DatabaseSchema, optio
const orderedItems = [
...itemMap.ExtensionCreate,
...itemMap.FunctionCreate,
...itemMap.ParameterSet,
...itemMap.ParameterReset,
...itemMap.EnumCreate,
...itemMap.TriggerDrop,
...itemMap.IndexDrop,
...itemMap.ConstraintDrop,
...itemMap.TableCreate,
...itemMap.ParameterSet,
...itemMap.ParameterReset,
...itemMap.ColumnAlter,
...itemMap.ColumnAdd,
...itemMap.ColumnRename,

View File

@@ -8,7 +8,7 @@ export const transformParameters: SqlTransformer = (ctx, item) => {
}
case 'ParameterReset': {
return asParameterReset(item.databaseName, item.parameterName);
return asParameterReset(item.databaseName, item.parameterName, item.tableName);
}
default: {
@@ -17,7 +17,19 @@ export const transformParameters: SqlTransformer = (ctx, item) => {
}
};
const getParameterName = (parameter: DatabaseParameter): string => {
if (parameter.scope === 'table' && parameter.tableName && parameter.name.startsWith(`${parameter.tableName}.`)) {
return parameter.name.slice(parameter.tableName.length + 1);
}
return parameter.name;
};
const asParameterSet = (parameter: DatabaseParameter): string => {
if (parameter.scope === 'table' && parameter.tableName) {
const paramName = getParameterName(parameter);
return `ALTER TABLE "${parameter.tableName}" SET (${paramName} = ${parameter.value})`;
}
let sql = '';
if (parameter.scope === 'database') {
sql += `ALTER DATABASE "${parameter.databaseName}" `;
@@ -28,6 +40,9 @@ const asParameterSet = (parameter: DatabaseParameter): string => {
return sql;
};
const asParameterReset = (databaseName: string, parameterName: string): string => {
const asParameterReset = (databaseName: string, parameterName: string, tableName?: string): string => {
if (tableName) {
return `ALTER TABLE "${tableName}" RESET (${parameterName})`;
}
return `ALTER DATABASE "${databaseName}" RESET "${parameterName}"`;
};

View File

@@ -86,6 +86,7 @@ export type PostgresDB = {
relhasindex: PostgresYesOrNo;
relisshared: PostgresYesOrNo;
relpersistence: string;
reloptions: string[] | null;
};
pg_constraint: {
@@ -306,6 +307,7 @@ export enum ActionType {
export type ColumnStorage = 'default' | 'external' | 'extended' | 'main';
export type ColumnType =
| '"char"'
| 'bigint'
| 'boolean'
| 'bytea'
@@ -316,6 +318,7 @@ export type ColumnType =
| 'integer'
| 'jsonb'
| 'polygon'
| 'smallint'
| 'text'
| 'time'
| 'time with time zone'
@@ -344,12 +347,13 @@ export type DatabaseSchema = {
export type DatabaseParameter = {
name: string;
databaseName: string;
tableName?: string;
value: string | number | null | undefined;
scope: ParameterScope;
synchronize: boolean;
};
export type ParameterScope = 'database' | 'user';
export type ParameterScope = 'database' | 'table' | 'user';
export type DatabaseOverride = {
name: string;
@@ -506,7 +510,7 @@ export type SchemaDiff = { reason: string } & (
| { type: 'TriggerCreate'; trigger: DatabaseTrigger }
| { type: 'TriggerDrop'; tableName: string; triggerName: string }
| { type: 'ParameterSet'; parameter: DatabaseParameter }
| { type: 'ParameterReset'; databaseName: string; parameterName: string }
| { type: 'ParameterReset'; databaseName: string; tableName?: string; parameterName: string }
| { type: 'EnumCreate'; enum: DatabaseEnum }
| { type: 'EnumDrop'; enumName: string }
| { type: 'OverrideCreate'; override: DatabaseOverride }

View File

@@ -0,0 +1,470 @@
import { Kysely, Selectable, sql } from 'kysely';
import postgres from 'postgres';
import { JOB_CODE_TO_NAME, JobCode, JobQueueStatus, QueueName } from 'src/enum';
import { DB } from 'src/schema';
import { JobTable } from 'src/schema/tables/job.table';
import { JobItem } from 'src/types';
const csvEscape = (s: string) => '"' + s.replace(/"/g, '""') + '"';
export type InsertRow = {
code: JobCode;
data: unknown;
priority: number | null;
dedupKey: string | null;
runAfter: Date | null;
};
export const getTable = (db: Kysely<DB>, queueName: QueueName) => db.dynamic.table(QUEUE_TABLE[queueName]).as('t');
export class QueueWorker {
activeJobCount = 0;
private concurrency: number;
private activeJobs = new Map<number, { startedAt: number }>();
private hasPending = true;
private fetching = false;
private paused = false;
private stopped = false;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private sweepTimer: ReturnType<typeof setTimeout> | null = null;
private readonly queueName: QueueName;
private readonly table: ReturnType<typeof getTable>;
private readonly stallTimeout: number;
private readonly claimBatch: number;
private readonly maxRetries: number;
private readonly backoffBaseMs: number;
private readonly db: Kysely<DB>;
private readonly onJobFn: (job: JobItem) => Promise<unknown>;
constructor(options: QueueWorkerOptions) {
this.queueName = options.queueName;
this.stallTimeout = options.stallTimeout;
this.claimBatch = options.claimBatch;
this.maxRetries = options.maxRetries;
this.backoffBaseMs = options.backoffBaseMs;
this.concurrency = options.concurrency;
this.db = options.db;
this.table = getTable(this.db, options.queueName);
this.onJobFn = options.onJob;
// One-shot sweep after stallTimeout to recover jobs orphaned by a crash
// that restarted before their expiry passed
this.sweepTimer = setTimeout(() => this.onNotification(), this.stallTimeout);
}
onNotification() {
this.hasPending = true;
void this.tryFetch();
}
setConcurrency(n: number) {
this.concurrency = n;
void this.tryFetch();
}
pause() {
this.paused = true;
}
resume() {
this.paused = false;
this.hasPending = true;
void this.tryFetch();
}
shutdown() {
this.stopped = true;
this.stopHeartbeat();
if (this.sweepTimer) {
clearTimeout(this.sweepTimer);
this.sweepTimer = null;
}
if (this.activeJobs.size === 0) {
return Promise.resolve();
}
// Re-queue active jobs
const ids = [...this.activeJobs.keys()];
return this.db
.updateTable(this.table)
.set({
status: JobQueueStatus.Pending,
startedAt: null,
expiresAt: null,
})
.where('id', 'in', ids)
.execute();
}
private get slotsAvailable() {
return Math.max(0, this.concurrency - this.activeJobCount);
}
private async tryFetch() {
if (this.fetching || this.paused || this.stopped) {
return;
}
this.fetching = true;
try {
while (this.slotsAvailable > 0 && this.hasPending && !this.stopped) {
const limit = Math.min(this.slotsAvailable, this.claimBatch);
const jobs = await this.claim(limit);
if (jobs.length === 0) {
const recovered = await this.recoverStalled();
if (recovered.numChangedRows === 0n) {
this.hasPending = false;
break;
}
continue;
}
this.activeJobCount += jobs.length;
for (const job of jobs) {
void this.processJob(job);
}
}
} finally {
this.fetching = false;
}
}
private async processJob(row: Selectable<JobTable>) {
this.activeJobs.set(row.id, { startedAt: Date.now() });
this.startHeartbeat();
try {
const jobName = JOB_CODE_TO_NAME[row.code];
if (!jobName) {
throw new Error(`Unknown job char code: ${row.code}`);
}
await this.onJobFn({ name: jobName, data: row.data } as JobItem);
// Success: delete completed job and try to fetch next
const next = this.stopped ? undefined : await this.completeAndFetch(row.id).catch(() => undefined);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeJobCount--;
this.hasPending = false;
}
} catch (error: unknown) {
const errorMsg = error instanceof Error ? error.message : String(error);
const next =
row.retries < this.maxRetries
? await this.retryAndFetch(row.id, row.retries).catch(() => undefined)
: await this.deadLetterAndFetch(row, errorMsg).catch(() => undefined);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeJobCount--;
this.hasPending = false;
}
} finally {
if (this.activeJobs.size === 0) {
this.stopHeartbeat();
}
}
}
/**
* Claim up to `limit` pending jobs.
* Uses a materialized CTE with FOR NO KEY UPDATE SKIP LOCKED
* to avoid race conditions and excessive locking.
*/
private claim(limit: number) {
return this.db
.with(
(wb) => wb('candidates').materialized(),
(qb) =>
qb
.selectFrom(this.table)
.select('id')
.where('status', '=', JobQueueStatus.Pending)
.where('runAfter', '<=', sql<Date>`now()`)
.orderBy('priority', 'desc')
.orderBy('id', 'asc')
.limit(limit)
.forNoKeyUpdate()
.skipLocked(),
)
.updateTable(this.table)
.set({
status: JobQueueStatus.Active,
startedAt: sql<Date>`now()`,
expiresAt: sql<Date>`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`,
})
.where((eb) => eb('id', 'in', eb.selectFrom('candidates').select('id')))
.returningAll()
.execute();
}
/**
* Atomically delete a completed job and claim the next one.
*/
private completeAndFetch(jobId: number) {
const prefix = this.db.with('mark', (qb: any) => qb.deleteFrom(this.table).where('id', '=', jobId));
return this.claimNext(prefix as any);
}
/**
* Atomically retry a failed job (reset to pending with backoff) and claim the next ready one.
*/
private retryAndFetch(jobId: number, retries: number) {
const backoffMs = this.backoffBaseMs * 5 ** retries;
const prefix = this.db.with('mark', (qb) =>
qb
.updateTable(this.table)
.set({
status: JobQueueStatus.Pending,
retries: retries + 1,
runAfter: sql<Date>`now() + ${sql.lit(`'${backoffMs} milliseconds'`)}::interval`,
startedAt: null,
expiresAt: null,
})
.where('id', '=', jobId),
);
return this.claimNext(prefix as any);
}
/**
* Atomically delete a permanently failed job, log it to the dead-letter table, and claim the next one.
*/
private deadLetterAndFetch(row: Selectable<JobTable>, errorMsg: string) {
const prefix = this.db
.with('mark', (qb) => qb.deleteFrom(this.table).where('id', '=', row.id))
.with('logged', (qb) =>
qb.insertInto('job_failures').values({
queueName: this.queueName,
code: row.code,
data: row.data,
error: errorMsg,
}),
);
return this.claimNext(prefix as any);
}
/**
* Shared suffix: claim the next pending job. Appended after prefix CTEs (mark, logged, etc.).
*/
private claimNext(prefix: Kysely<DB>) {
return prefix
.with('next', (qb) =>
qb
.selectFrom(this.table)
.select('id')
.where('status', '=', JobQueueStatus.Pending)
.where('runAfter', '<=', sql<Date>`now()`)
.orderBy('priority', 'desc')
.orderBy('id', 'asc')
.limit(1)
.forNoKeyUpdate()
.skipLocked(),
)
.updateTable(this.table)
.set({
status: JobQueueStatus.Active,
startedAt: sql<Date>`now()`,
expiresAt: sql<Date>`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`,
})
.where((eb) => eb('id', '=', eb.selectFrom('next').select('id')))
.returningAll()
.executeTakeFirst();
}
/**
* Recover stalled jobs: reset jobs whose expires_at has passed
*/
private recoverStalled() {
return this.db
.updateTable(this.table)
.set({
status: JobQueueStatus.Pending,
startedAt: null,
expiresAt: null,
})
.where('status', '=', JobQueueStatus.Active)
.where('expiresAt', '<', sql<Date>`now()`) // needed for multi-instance safety
.executeTakeFirst();
}
/**
* Extend expiry for all active jobs (heartbeat)
*/
private extendExpiry() {
if (this.activeJobs.size === 0) {
return;
}
const ids = [...this.activeJobs.keys()];
return this.db
.updateTable(this.table)
.set({
expiresAt: sql<Date>`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`,
})
.where('id', 'in', ids)
.execute();
}
private startHeartbeat() {
if (this.heartbeatTimer) {
return;
}
this.heartbeatTimer = setInterval(
() => this.extendExpiry()?.catch(() => setTimeout(() => this.extendExpiry(), 5000)),
Math.floor(this.stallTimeout / 2),
);
}
private stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}
export class WriteBuffer {
private buffers = Object.fromEntries(Object.values(QueueName).map((name) => [name as QueueName, [] as InsertRow[]]));
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private pgPool: postgres.Sql,
private notify: (queue: QueueName) => Promise<unknown>,
private onFlushError?: (error: unknown) => void,
) {}
add(items: { queue: QueueName; row: InsertRow }[]): void {
if (items.length === 0) {
return;
}
for (const { queue, row } of items) {
this.buffers[queue].push(row);
}
if (!this.timer) {
this.timer = setTimeout(() => void this.flush().catch((error) => this.onFlushError?.(error)), 10);
}
}
async flush(): Promise<void> {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
const promises: Promise<unknown>[] = [];
for (const [queue, rows] of Object.entries(this.buffers)) {
if (rows.length === 0) {
continue;
}
const queueName = queue as QueueName;
const tableName = QUEUE_TABLE[queueName];
const copyRows: InsertRow[] = [];
const insertRows: InsertRow[] = [];
for (const row of rows) {
if (row.dedupKey) {
insertRows.push(row);
} else {
copyRows.push(row);
}
}
rows.length = 0;
if (copyRows.length > 0) {
promises.push(this.copyInsert(tableName, copyRows).then(() => this.notify(queueName)));
}
if (insertRows.length > 0) {
promises.push(this.insertChunk(tableName, insertRows).then(() => this.notify(queueName)));
}
}
await Promise.all(promises);
}
private async copyInsert(tableName: string, rows: InsertRow[]) {
const conn = await this.pgPool.reserve();
try {
const writable = await conn`COPY ${conn(tableName)} (code, data, priority, "runAfter") FROM STDIN WITH (FORMAT csv)`.writable();
const now = new Date().toISOString();
for (const row of rows) {
const data = row.data != null ? csvEscape(JSON.stringify(row.data)) : '';
const priority = row.priority ?? 0;
const runAfter = row.runAfter ? row.runAfter.toISOString() : now;
writable.write(`${row.code},${data},${priority},${runAfter}\n`);
}
writable.end();
await new Promise<void>((resolve, reject) => {
writable.on('finish', resolve);
writable.on('error', reject);
});
} finally {
conn.release();
}
}
private insertChunk(tableName: string, rows: InsertRow[]) {
const now = new Date().toISOString();
const code = [];
const data = [];
const priority = [];
const dedupKey = [];
const runAfter = [];
for (const row of rows) {
code.push(row.code);
data.push(row.data ?? null);
priority.push(row.priority ?? 0);
dedupKey.push(row.dedupKey);
runAfter.push(row.runAfter?.toISOString() ?? now);
}
return this.pgPool`
INSERT INTO ${this.pgPool(tableName)} (code, data, priority, "dedupKey", "runAfter")
SELECT * FROM unnest(
${code}::smallint[],
${data as any}::jsonb[],
${priority}::smallint[],
${dedupKey}::text[],
${runAfter}::timestamptz[]
)
ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL
DO UPDATE SET "runAfter" = EXCLUDED."runAfter", data = EXCLUDED.data
WHERE ${this.pgPool(tableName)}.status = ${JobQueueStatus.Pending}
`;
}
}
const QUEUE_TABLE = {
[QueueName.ThumbnailGeneration]: 'jobs_thumbnail_generation',
[QueueName.MetadataExtraction]: 'jobs_metadata_extraction',
[QueueName.VideoConversion]: 'jobs_video_conversion',
[QueueName.FaceDetection]: 'jobs_face_detection',
[QueueName.FacialRecognition]: 'jobs_facial_recognition',
[QueueName.SmartSearch]: 'jobs_smart_search',
[QueueName.DuplicateDetection]: 'jobs_duplicate_detection',
[QueueName.BackgroundTask]: 'jobs_background_task',
[QueueName.StorageTemplateMigration]: 'jobs_storage_template_migration',
[QueueName.Migration]: 'jobs_migration',
[QueueName.Search]: 'jobs_search',
[QueueName.Sidecar]: 'jobs_sidecar',
[QueueName.Library]: 'jobs_library',
[QueueName.Notification]: 'jobs_notification',
[QueueName.BackupDatabase]: 'jobs_backup_database',
[QueueName.Ocr]: 'jobs_ocr',
[QueueName.Workflow]: 'jobs_workflow',
[QueueName.Editor]: 'jobs_editor',
} as const;
interface QueueWorkerOptions {
queueName: QueueName;
stallTimeout: number;
claimBatch: number;
maxRetries: number;
backoffBaseMs: number;
concurrency: number;
db: Kysely<DB>;
onJob: (job: JobItem) => Promise<unknown>;
}

View File

@@ -9,13 +9,6 @@ const envData: EnvData = {
logFormat: LogFormat.Console,
buildMetadata: {},
bull: {
config: {
connection: {},
prefix: 'immich_bull',
},
queues: [{ name: 'queue-1' }],
},
cls: {
config: {},

View File

@@ -12,13 +12,13 @@ export const newJobRepositoryMock = (): Mocked<RepositoryInterface<JobRepository
pause: vitest.fn(),
resume: vitest.fn(),
searchJobs: vitest.fn(),
queue: vitest.fn().mockImplementation(() => Promise.resolve()),
queueAll: vitest.fn().mockImplementation(() => Promise.resolve()),
queue: vitest.fn(),
queueAll: vitest.fn(),
isActive: vitest.fn(),
isPaused: vitest.fn(),
getJobCounts: vitest.fn(),
clear: vitest.fn(),
waitForQueueCompletion: vitest.fn(),
removeJob: vitest.fn(),
onShutdown: vitest.fn(),
};
};