Design: Idempotency Helpers (Phase 5a)
Overview
Section titled “Overview”This document is the implementation-shaped design for the idempotency helpers in Phase 5a. It pairs with the main Phase 5a design at index.md § 5 and is governed by DQ-R1-031.
The package addresses two seams:
- Intent (a) — inbound idempotency. L4 receives an HTTP request with an
Idempotency-Keyheader; the L3 service de-duplicates retries against anIdempotencyStore. If the same key is replayed, the store returns the prior outcome. - Intent (b) — outbound retry safety. L3 generates a deterministic idempotency key from a natural key (tenant + entity + operation), passes it to a downstream API (Postmark, Stripe, etc.). The same natural key always produces the same key, so retries against the downstream API are safe even when the in-flight bookkeeping is lost.
Two-tier API
Section titled “Two-tier API”The design has two layers:
RawIdempotencyStore— the native interface. Operates onJsonElementsymmetrically: request payloads and result payloads areJsonElementat the store boundary. Schema-evolution is the caller’s concern.IdempotencyStore<Req, Res>— a typed wrapper produced byinline fun <reified Req, reified Res> RawIdempotencyStore.typedAs(json: Json = JsonConfig.standardJson): IdempotencyStore<Req, Res>. The wrapper captures resolvedKSerializer<Req>/KSerializer<Res>references once at wiring time and delegates to the native store for all storage operations.
Consumers that want type safety call typedAs() once at wiring time and use the typed view. Consumers that want explicit control over the idempotency-hash projection (e.g., to strip transient fields or project to a stable shape across Req versions) use the native store directly.
Schema evolution
Section titled “Schema evolution”Req schema evolution — the case where a consumer changes a Req data class in a serialisation-affecting way — is caller-controlled:
- A consumer that doesn’t care about evolution uses
typedAs()withJsonConfig.standardJsonand accepts thatReqshape changes will treat in-flight records asMismatchuntil they expire (operational mitigation: drain before deploy). - A consumer that needs stable hashes across
Reqversions writes a customKSerializer<Req>(registered in a refinedSerializersModule) that emits a stable JSON shape. The custom serializer is passed via a refinedJsontotypedAs(json = customJson). - A consumer that needs full control projects
Reqto a stableJsonElementshape explicitly and calls the nativeRawIdempotencyStoredirectly.
No request_schema_version column ships in v1. The store-side defence is replaced by the caller-controlled typed-wrapper boundary.
Decision Summary
Section titled “Decision Summary”| # | Decision | Choice |
|---|---|---|
| DQ-R1-031 | Type shape at the store boundary | RawIdempotencyStore (native JsonElement, symmetric Req/Res) + IdempotencyStore<Req, Res> typed wrapper via inline fun typedAs(). |
| DQ-R1-031 | Decode-failure handling | Typed wrapper returns Result.failure(AppError.Internal.IncompatibleState) when serialization fails at replay time. Decode failures are bug-class; pages on-call. |
| DQ-R1-031 | Mismatch shape | Carries recordedRequest: JsonElement for operational debugging on hash collision. |
| DQ-R1-031 | Schema evolution | Caller-controlled via typedAs(json = ...) configuration or native-store JsonElement projection. No store-wide schema-version column. |
| DQ-R1-031 | Storage column type | JSONB for request_payload, result_payload, error_payload. |
| DQ-R1-031 | Mismatch.equals / hashCode override | Kept (ByteArray content-equality required for tests and consumer-side caches). |
| DQ-R1-031 | replayWindowOverride parameter on begin() | Kept (Duration? = null; per-request override; default uses store window). |
1. Package layout
Section titled “1. Package layout”All code lives under cards.arda.common.lib.runtime.idempotency:
runtime/idempotency/├── core/│ ├── ConsumerNamespace.kt -- value class; identifies a logical consumer│ ├── IdempotencyKey.kt -- value class; namespace + key value│ ├── IdempotencyOutcome.kt -- sealed class; native outcomes (raw store)│ ├── IdempotencyKeyMinter.kt -- Intent (b) deterministic key generation│ └── CanonicalJson.kt -- internal canonical-JSON helper├── service/│ ├── RawIdempotencyStore.kt -- native interface (JsonElement)│ ├── IdempotencyStore.kt -- typed interface (Req, Res)│ ├── ExposedRawIdempotencyStore.kt -- native Exposed-based implementation│ ├── IdempotencyStoreImpl.kt -- typed wrapper over the native store│ ├── IdempotencyStoreFactory.kt -- factory with three entry points│ └── TypedAs.kt -- inline extension producing the typed wrapper└── api/ └── HeaderParsing.kt -- Idempotency-Key HTTP header parsingapi/ is flat (no api/rest/ subdirectory) so future gRPC / SQS adapters can drop in as siblings without renaming.
2. Public API surface
Section titled “2. Public API surface”2.1 core/ConsumerNamespace.kt
Section titled “2.1 core/ConsumerNamespace.kt”package cards.arda.common.lib.runtime.idempotency.core
import kotlinx.serialization.Serializable
/** * Identifies a logical consumer (an L3 service, typically) within the shared * idempotency table. Each consumer in the same JVM uses its own * [ConsumerNamespace] constant; the store partitions rows accordingly. * * Example consumer-side declaration: * val EMAIL_JOB_NAMESPACE = ConsumerNamespace("email-job") */@JvmInline@Serializablevalue class ConsumerNamespace(val value: String) { init { require(value.isNotBlank()) { "ConsumerNamespace must not be blank" } require(value.length <= MAX_LENGTH) { "ConsumerNamespace must be at most $MAX_LENGTH characters" } require(value.all { it == '-' || it == '_' || it.isLowerCase() || it.isDigit() }) { "ConsumerNamespace must contain only lowercase letters, digits, '-', and '_'" } }
companion object { const val MAX_LENGTH = 64 }}Validation in init is the only place require is used in this package — the value class is created at consumer-module declaration time (a programming error if invalid), not from external input. Per kotlin-coding, require is acceptable for true programming errors.
2.2 core/IdempotencyKey.kt
Section titled “2.2 core/IdempotencyKey.kt”package cards.arda.common.lib.runtime.idempotency.core
import kotlinx.serialization.Serializable
/** * An idempotency key, composed of a [ConsumerNamespace] and a per-consumer * key value. The pair (namespace, value) uniquely identifies an in-flight * or recorded operation in the idempotency store. * * Keys are constructed via: * - [IdempotencyKeyMinter] (Intent (b) -- deterministic, from natural keys) * - [parseIdempotencyHeader] (Intent (a) -- caller-supplied via HTTP header) */@Serializabledata class IdempotencyKey internal constructor( val namespace: ConsumerNamespace, val value: String,) { companion object { const val MAX_VALUE_LENGTH = 255 }}internal constructor — keys must come through the minter or the header parser; consumers don’t construct them directly. This guarantees the format invariants hold at every call site.
2.3 core/IdempotencyOutcome.kt
Section titled “2.3 core/IdempotencyOutcome.kt”package cards.arda.common.lib.runtime.idempotency.core
import cards.arda.common.lib.lang.errors.AppErrorimport kotlinx.serialization.json.JsonElement
/** * The result of a [RawIdempotencyStore.begin] call. Each variant tells the * caller what to do next. * * Typed callers see [IdempotencyStore]'s outcome shape, which is the same * tree with `JsonElement` substituted for `Req` / `Res`. */sealed class IdempotencyOutcome {
/** * No prior record for this key. The caller proceeds with the operation * and must eventually call [RawIdempotencyStore.commit] / * [RawIdempotencyStore.failPermanent] / [RawIdempotencyStore.failTransient] * to close the in-flight row. */ data object FreshAttempt : IdempotencyOutcome()
/** * A prior record exists with the same request payload and a committed * result. Replay returns that result; caller does not re-execute. */ data class PriorResult(val value: JsonElement) : IdempotencyOutcome()
/** * A prior record exists with the same request payload and a permanent * failure. Replay returns that error; caller does not re-execute. */ data class PriorError(val error: AppError) : IdempotencyOutcome()
/** * A prior record exists for the same key but with a *different* request * payload. The caller MUST reject the new request -- accepting it would * silently overwrite the prior result. * * [recordedRequest] is the prior request's `JsonElement` as it was first * committed, for operational debugging. (Typed wrappers may attempt to * deserialize it; deserialization failure is non-fatal -- the mismatch * itself is the actionable signal.) * * [equals] / [hashCode] are overridden to compare [recordedHash] / * [submittedHash] by content rather than identity (the default `ByteArray` * equality is reference equality and would break test assertions and * consumer-side caches). */ class Mismatch( val recordedHash: ByteArray, val submittedHash: ByteArray, val recordedRequest: JsonElement, ) : IdempotencyOutcome() { override fun equals(other: Any?): Boolean = ... override fun hashCode(): Int = ... }
/** * A prior record exists for the same key with status `in_progress`. The * caller backs off (typically: surface a 409 / Retry-After at L4) and * retries at the protocol layer once the in-flight attempt completes. */ data object InFlight : IdempotencyOutcome()}2.4 core/IdempotencyKeyMinter.kt
Section titled “2.4 core/IdempotencyKeyMinter.kt”package cards.arda.common.lib.runtime.idempotency.core
/** * Deterministic idempotency-key generation from a list of natural-key parts * (Intent (b)). * * Same parts in the same order produce the same key across invocations, * across processes, and across JVM restarts. The hash function is SHA-256 * over a canonical encoding of the parts (length-prefixed, UTF-8); collision * resistance is whatever SHA-256 provides. * * The minter is bound to a single [ConsumerNamespace] at construction time * so a single consumer can't accidentally collide its own keys against * another consumer's namespace. */class IdempotencyKeyMinter(private val namespace: ConsumerNamespace) { fun mint(parts: List<String>): Result<IdempotencyKey> = ...}mint returns Result.failure(AppError.Invocation.GeneralValidation) if parts is empty or any part is blank.
2.5 core/CanonicalJson.kt
Section titled “2.5 core/CanonicalJson.kt”package cards.arda.common.lib.runtime.idempotency.core
import cards.arda.common.lib.lang.serialization.JsonConfigimport kotlinx.serialization.json.Jsonimport kotlinx.serialization.json.JsonArrayimport kotlinx.serialization.json.JsonElementimport kotlinx.serialization.json.JsonObjectimport kotlinx.serialization.json.JsonPrimitive
/** * Canonical-JSON helper used by the idempotency store to compute stable * hashes from `JsonElement` payloads. * * "Canonical" here means: same logical input produces the same bytes, * regardless of map insertion order, `Set` iteration order, or whitespace. * `JsonArray` order is preserved (array order is semantically meaningful * in JSON). * * The output uses [JsonConfig.refine] to produce a Json with * `prettyPrint = false`; all other settings inherit from * `JsonConfig.standardJson`. */internal object CanonicalJson {
private val canonicalJson: Json = JsonConfig.refine { prettyPrint = false }
fun canonicalUtf8(element: JsonElement): ByteArray = ...}2.6 service/RawIdempotencyStore.kt
Section titled “2.6 service/RawIdempotencyStore.kt”package cards.arda.common.lib.runtime.idempotency.service
import cards.arda.common.lib.lang.errors.AppErrorimport cards.arda.common.lib.runtime.idempotency.core.IdempotencyKeyimport cards.arda.common.lib.runtime.idempotency.core.IdempotencyOutcomeimport kotlinx.serialization.json.JsonElementimport java.time.Duration
/** * Native idempotency store interface. Operates on [JsonElement] symmetrically: * request payloads and result payloads are [JsonElement] at the store * boundary. Schema-evolution is the caller's concern. * * Typed consumers use [IdempotencyStore] (produced by [typedAs]); raw * consumers use this interface directly. * * All methods must execute within the caller's transaction. The factory * (see [IdempotencyStoreFactory]) produces an instance bound to a specific * transaction context (`inTransaction(connection)`, `inConnection(connection)`, * `withTx(tx)`); using one outside that context returns `Result.failure`. */interface RawIdempotencyStore {
/** * Begin (or replay) an attempt under [key] for [request]. * * The store INSERTs an `in_progress` row if no prior record exists, or * dispatches against the prior row's status / hash to produce the outcome. * * @param replayWindowOverride if non-null, overrides the store's default * replay window for this attempt's `expires_at`. */ fun begin( key: IdempotencyKey, request: JsonElement, replayWindowOverride: Duration? = null, ): Result<IdempotencyOutcome>
/** * Mark the in-flight attempt under [key] as successfully committed with * [result]. Replays of the same (key, request) return * [IdempotencyOutcome.PriorResult] of [result]. */ fun commit(key: IdempotencyKey, result: JsonElement): Result<Unit>
/** * Mark the in-flight attempt under [key] as a permanent failure with * [error]. Replays of the same (key, request) return * [IdempotencyOutcome.PriorError] of [error]. */ fun failPermanent(key: IdempotencyKey, error: AppError): Result<Unit>
/** * Mark the in-flight attempt under [key] as a transient failure. The row * is DELETEd; the next [begin] for the same key returns * [IdempotencyOutcome.FreshAttempt]. */ fun failTransient(key: IdempotencyKey): Result<Unit>
/** * Delete rows whose `expires_at` is at or before [asOf]. Returns the * number of rows deleted. Intended for periodic invocation by the * consuming component's scheduler. * * Scoped to this store's [ConsumerNamespace] only; never touches another * consumer's rows. */ fun purgeExpired(asOf: java.time.Instant): Result<Int>}2.7 service/IdempotencyStore.kt
Section titled “2.7 service/IdempotencyStore.kt”package cards.arda.common.lib.runtime.idempotency.service
import cards.arda.common.lib.lang.errors.AppErrorimport cards.arda.common.lib.runtime.idempotency.core.IdempotencyKeyimport java.time.Duration
/** * Typed idempotency store interface produced by [typedAs]. * * Wraps a [RawIdempotencyStore] with `KSerializer<Req>` / `KSerializer<Res>` * captured at wiring time. On replay-time decode failures (the bytes in * `request_payload` / `result_payload` cannot be deserialized to the * current [Req] / [Res]), [begin] / `commit` ops return * `Result.failure(AppError.Internal.IncompatibleState(...))` -- decode * failures are bug-class. */interface IdempotencyStore<Req, Res> {
fun begin( key: IdempotencyKey, request: Req, replayWindowOverride: Duration? = null, ): Result<TypedIdempotencyOutcome<Req, Res>>
fun commit(key: IdempotencyKey, result: Res): Result<Unit>
fun failPermanent(key: IdempotencyKey, error: AppError): Result<Unit>
fun failTransient(key: IdempotencyKey): Result<Unit>
fun purgeExpired(asOf: java.time.Instant): Result<Int>}
/** * Typed counterpart of [IdempotencyOutcome]. Same five variants with the * `JsonElement` payloads decoded to [Req] / [Res]. * * Note that [Mismatch.recordedRequest] is `Result<Req>` -- if the recorded * `JsonElement` cannot decode to the current `Req` (schema drift), the * mismatch is still reported; the [Result] surfaces the decode failure * separately so callers can log diagnostically without blocking on the * decode. */sealed class TypedIdempotencyOutcome<Req, Res> { data object FreshAttempt : TypedIdempotencyOutcome<Nothing, Nothing>() data class PriorResult<Res>(val value: Res) : TypedIdempotencyOutcome<Nothing, Res>() data class PriorError(val error: AppError) : TypedIdempotencyOutcome<Nothing, Nothing>() data class Mismatch<Req>( val recordedHash: ByteArray, val submittedHash: ByteArray, val recordedRequest: Result<Req>, ) : TypedIdempotencyOutcome<Req, Nothing>() { override fun equals(other: Any?): Boolean = ... override fun hashCode(): Int = ... } data object InFlight : TypedIdempotencyOutcome<Nothing, Nothing>()}2.8 service/TypedAs.kt
Section titled “2.8 service/TypedAs.kt”package cards.arda.common.lib.runtime.idempotency.service
import cards.arda.common.lib.lang.serialization.JsonConfigimport kotlinx.serialization.json.Jsonimport kotlinx.serialization.serializer
/** * Produce a typed view of a [RawIdempotencyStore]. * * Resolves `KSerializer<Req>` / `KSerializer<Res>` once at call time * (typically wiring time, not per-call) using [json]'s [SerializersModule]. * Callers that need a custom serialization shape (e.g., stable across * `Req` schema versions) pass a refined [Json] here. * * Decode failures at replay time are surfaced as * `Result.failure(AppError.Internal.IncompatibleState(...))` from the * typed store's methods -- the typed boundary is a place to enforce * "schema didn't drift", not a place to recover from drift. */inline fun <reified Req, reified Res> RawIdempotencyStore.typedAs( json: Json = JsonConfig.standardJson,): IdempotencyStore<Req, Res> = IdempotencyStoreImpl( raw = this, json = json, reqSerializer = json.serializersModule.serializer<Req>(), resSerializer = json.serializersModule.serializer<Res>(), )2.9 service/IdempotencyStoreFactory.kt
Section titled “2.9 service/IdempotencyStoreFactory.kt”package cards.arda.common.lib.runtime.idempotency.service
import cards.arda.common.lib.runtime.idempotency.core.ConsumerNamespaceimport org.jetbrains.exposed.sql.Transactionimport java.sql.Connectionimport java.time.Duration
/** * Constructs [RawIdempotencyStore] instances bound to a specific transaction * context. Mirrors the [DatabaseBackedMap] factory pattern. * * The factory itself is stateless beyond its configuration; instances are * cheap to create and can be wired once per L3 service at module-init time. * * @param tableName Postgres table name; typically `idempotency_record` * (one shared table across consumers; the * [ConsumerNamespace] partitions rows). * @param namespace The [ConsumerNamespace] this store operates under. * @param replayWindow Default replay window for new records (sets * `expires_at = now + replayWindow`). Callers may * override per-request via `begin`'s * `replayWindowOverride` parameter. */class IdempotencyStoreFactory( private val tableName: String, private val namespace: ConsumerNamespace, private val replayWindow: Duration,) { fun inTransaction(connection: Connection): Result<RawIdempotencyStore> = ... fun inConnection(connection: Connection): Result<RawIdempotencyStore> = ... fun withTx(tx: Transaction): Result<RawIdempotencyStore> = ...}inTransaction rejects if the connection is in autocommit mode (no transaction); inConnection accepts any connection state (caller manages transactions itself); withTx accepts an Exposed Transaction directly. Pattern lifted from DatabaseBackedMap.kt.
The factory binds the store to the caller’s transaction without ever opening or closing one on the caller’s behalf, honouring DQ-208 (async-tx boundaries): “L3 services own transactions; common-module helpers must not open or close transactions on the caller’s behalf.”
2.10 api/HeaderParsing.kt
Section titled “2.10 api/HeaderParsing.kt”package cards.arda.common.lib.runtime.idempotency.api
import cards.arda.common.lib.lang.errors.AppErrorimport cards.arda.common.lib.runtime.idempotency.core.ConsumerNamespaceimport cards.arda.common.lib.runtime.idempotency.core.IdempotencyKey
/** * Parse an `Idempotency-Key` HTTP header value into an [IdempotencyKey] * under [namespace]. * * Trims; enforces length cap ([IdempotencyKey.MAX_VALUE_LENGTH]); rejects * blank, oversize, or control-character-containing values with * [AppError.Invocation.GeneralValidation]. * * Per [sanitizeHeader][cards.arda.common.lib.api.headers.sanitizeHeader] in * the L4 inbound pipeline, the value is expected to have already passed * value-cleaning; this parser performs only the idempotency-specific * format checks. */fun parseIdempotencyHeader( namespace: ConsumerNamespace, rawValue: String?,): Result<IdempotencyKey?> = ...Result.success(null) if rawValue is null (header not present); Result.success(key) on accept; Result.failure on hard rejection.
3. Persistence
Section titled “3. Persistence”3.1 Table DDL
Section titled “3.1 Table DDL”The store assumes a Postgres table created by a Flyway migration in the consuming component (per § 6 below, common-module does not ship production-side migrations):
CREATE TABLE idempotency_record ( namespace TEXT NOT NULL, key_value TEXT NOT NULL, request_hash BYTEA NOT NULL, request_payload JSONB NOT NULL, status TEXT NOT NULL CHECK (status IN ('in_progress', 'committed', 'failed_permanent')), result_payload JSONB, error_payload JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), expires_at TIMESTAMPTZ NOT NULL, PRIMARY KEY (namespace, key_value));CREATE INDEX idempotency_record_expires_idx ON idempotency_record (expires_at);JSONB rather than BYTEA for the three payload columns: PostgreSQL validates the payload at write time (corrupted bytes cannot sneak in); SQL-level inspection produces readable output (SELECT result_payload FROM idempotency_record WHERE namespace = 'email-job' LIMIT 10 is operationally useful); JSONB has native indexing if a future need arises (none in v1). The trade-off is that the wire format is committed to JSON end-to-end; switching to CBOR or another binary format later would be a column-type migration. Judged unlikely.
No foreign keys to consumer-owned tables. The idempotency_record schema is intentionally consumer-agnostic: rows are partitioned by ConsumerNamespace (the first column of the primary key) and never reference a consumer-side entity by FK. This honours the Cross-Universe rule (entities owned by different services must not share foreign keys or transactions) even as multiple consumers in the same component share the table.
3.2 begin() concurrency strategy
Section titled “3.2 begin() concurrency strategy”Two SQL operations per begin():
- INSERT ON CONFLICT (namespace, key_value) DO NOTHING. The insert carries the new row with
status = 'in_progress',request_hash,request_payload,expires_at. If the insert succeeds, the caller’s outcome isFreshAttempt. - If the insert was a no-op (the row already exists), follow up with a
SELECT ... FOR ...— actually noFOR UPDATE; just a plain SELECT. The store reads the row’s status, hash, and payload (ifMismatch), then dispatches:status = 'in_progress'->InFlight.status = 'committed',request_hash = submitted_hash->PriorResult(result_payload).status = 'failed_permanent',request_hash = submitted_hash->PriorError(error_payload decoded).request_hash != submitted_hash->Mismatch(recorded_hash, submitted_hash, request_payload).
No SELECT FOR UPDATE; no row locks; no inter-pod blocking. The InFlight outcome means the second caller backs off and retries at the protocol layer.
Two round-trips in the contended case; one in the common (uncontested) case.
3.3 Transition state machine
Section titled “3.3 Transition state machine”A begin() call either inserts a fresh row (state FreshAttempt) or short-circuits with a replay outcome read from an existing row. From FreshAttempt, the caller chooses one of three terminal transitions: commit(result), failPermanent(error), or failTransient() (which deletes the row so the next begin() can re-attempt).
4. error_payload projection
Section titled “4. error_payload projection”failPermanent(key, error: AppError) stores error in error_payload. Per DQ-R1-031, the on-disk shape is a @Serializable data class:
@Serializableinternal data class StoredAppError( val classTag: String, // e.g., "AppError.Internal.IncompatibleState" val message: String, val cause: String? = null, // toString() of cause, if any; non-recursive)The cause-chain is not recursively serialized; only the immediate cause’s toString() is captured. On replay, the typed wrapper produces IdempotencyOutcome.PriorError(decodedAppError) where decodedAppError is reconstructed by matching classTag against the known AppError subtype set. Unknown class tags surface as AppError.Internal.Implementation with the original message preserved (operational signal that the consumer’s AppError hierarchy has evolved).
The exact StoredAppError shape is fixed in the implementation PR; the on-disk format is private to the store (consumers see AppError, not StoredAppError).
5. Decode-failure handling
Section titled “5. Decode-failure handling”The typed wrapper performs three decode operations:
JsonElement -> ReqonMismatch.recordedRequest. Failure is non-fatal; surfaced asMismatch(recordedHash, submittedHash, recordedRequest = Result.failure(...)). The mismatch itself is still actionable.JsonElement -> ResonPriorResult.value. Failure is fatal; returned asResult.failure(AppError.Internal.IncompatibleState(...)). The store can’t fulfil the typed contract; the consumer’s schema has drifted.JsonElement -> AppErroronPriorError.error(via theStoredAppErrorprojection above). Failure is fatal; same treatment asResdecode failure.
Decode failures are bugs. The consumer changed schema without a coordinated drain or adapter; on-call sees the IncompatibleState page and follows up.
Native (RawIdempotencyStore) consumers don’t see decode failures because they don’t decode — they handle the JsonElement directly.
6. Consumer adoption
Section titled “6. Consumer adoption”For each L3 service that adopts an idempotency store:
- Define a
ConsumerNamespaceconstant in the service’s module:val EMAIL_JOB_NAMESPACE = ConsumerNamespace("email-job") - Add the Flyway migration creating the shared
idempotency_recordtable (one-time per component, not per service):-- V0NN__idempotency_record.sql in operations/.../migrations/CREATE TABLE idempotency_record (...); -- per § 3.1 - At module wiring time, construct the factory and typed view:
val emailJobFactory = IdempotencyStoreFactory(tableName = "idempotency_record",namespace = EMAIL_JOB_NAMESPACE,replayWindow = Duration.ofHours(24),)// ... inside the L3 service, in the caller's transaction:val rawStore = emailJobFactory.withTx(tx).getOrElse { return Result.failure(it) }val typedStore: IdempotencyStore<EmailSendRequest, EmailJob> = rawStore.typedAs()
- Wire a periodic
purgeExpired(Instant.now())call to the consuming component’s scheduler. - For outbound (Intent (b)) needs, construct an
IdempotencyKeyMinterat module wiring:val emailJobKeyMinter = IdempotencyKeyMinter(EMAIL_JOB_NAMESPACE)
Per DQ-R1-031, the idempotency_record Flyway migration ships in Phase 5b’s consumer adoption (operations), not in common-module — common-module does not own production-side migrations. See 5a-component-library-updates/goal.md “Out of scope”.
7. Test plan
Section titled “7. Test plan”| Surface | Test type | What it asserts |
|---|---|---|
ConsumerNamespace | Pure Kotlin | init validation: blank rejected, oversize rejected, illegal characters rejected, legal value accepted. |
IdempotencyKey | Pure Kotlin | Companion MAX_VALUE_LENGTH. Round-trip serialization through JsonConfig.standardJson. |
IdempotencyKeyMinter | Pure Kotlin | Deterministic: same parts -> same key across invocations. Different parts -> different key. Empty parts or blank parts -> Result.failure. |
CanonicalJson.canonicalUtf8 | Pure Kotlin | Same logical input -> same bytes when fields are reordered, when Map is rebuilt with different insertion order, when nested objects swap field order. JsonArray order preserved. |
parseIdempotencyHeader | Pure Kotlin | Trim, length cap, control-character rejection, blank rejection, null rejection, happy path. |
IdempotencyStoreFactory.withTx | ContainerizedPostgres | Rejects no-transaction, rejects auto-commit, accepts active transaction. |
RawIdempotencyStore.begin happy path | ContainerizedPostgres | First call returns FreshAttempt; row inserted with status = 'in_progress'. |
RawIdempotencyStore lifecycle | ContainerizedPostgres | begin -> commit -> begin returns PriorResult with original JsonElement. begin -> failPermanent -> begin returns PriorError. begin -> failTransient -> begin returns FreshAttempt. |
Mismatch detection (native) | ContainerizedPostgres | begin(key, reqA) -> commit -> begin(key, reqB) (different body, same key) returns Mismatch(recordedHash, submittedHash, recordedRequest = reqA-as-JsonElement). |
InFlight outcome | ContainerizedPostgres | begin(key, req) (no commit/fail) -> second begin(key, req) from a different transaction returns InFlight. |
purgeExpired | ContainerizedPostgres | Deletes rows with expires_at <= asOf for the bound namespace only; leaves other namespaces’ rows alone. |
replayWindowOverride | ContainerizedPostgres | Per-request override sets expires_at to now + override rather than now + store.replayWindow. |
Typed wrapper begin happy path | ContainerizedPostgres | Typed begin(key, reqInstance) round-trips: commit(resInstance) then begin(key, reqInstance) returns PriorResult(resInstance) with the original typed value. |
| Typed wrapper decode failure (Res) | ContainerizedPostgres | Res schema drift between commit and replay returns Result.failure(AppError.Internal.IncompatibleState). |
| Typed wrapper decode failure (Req on Mismatch) | ContainerizedPostgres | Req schema drift surfaces as Mismatch.recordedRequest = Result.failure; the mismatch is still reported. |
Custom Json wiring | ContainerizedPostgres | A typedAs(json = customJson) invocation uses the custom KSerializer<Req> and round-trips through the stable shape. |
Test files live at lib/src/test/kotlin/cards/arda/common/lib/runtime/idempotency/, mirroring the source tree. Integration tests inherit the existing ContainerizedPostgres.fromTestConfig(...) / fromValues(...) patterns from lib/src/test/kotlin/cards/arda/common/lib/testing/persistence/.
8. Knowledge-base extraction (post-merge)
Section titled “8. Knowledge-base extraction (post-merge)”The canonical-JSON walker and the typed-wrapper-via-reified-inline-extension pattern are reusable. After the implementation PR merges, extract two knowledge-base entries to common-module/knowledge-base/:
canonical-json-hashing.md— when to canonicalise serialized JSON, theJsonElement-walk recipe, theJsonConfig.refinepattern, gotchas (Map/Setinsertion order;JsonArrayorder is semantically meaningful).reified-inline-extension-factory.md— theinline fun <reified T> ReceiverType.factoryName(...)pattern; how it composes withJson.serializersModule.serializer<T>().
These are not deliverables of this design; flag them in the implementation PR as follow-ups.
9. Risks and mitigations
Section titled “9. Risks and mitigations”| Risk | Mitigation |
|---|---|
A consumer’s Req evolves in a way that changes serialization bytes (e.g., field rename), so in-flight records hash differently after the change. | Documented as caller responsibility. Consumer either: (a) drains the table operationally before deploying (purgeExpired once the in-flight window has elapsed), or (b) writes a custom KSerializer<Req> (registered in a refined Json passed to typedAs(json = ...)) that emits a stable shape across schema versions, or (c) uses RawIdempotencyStore directly and projects Req to a stable JsonElement. |
The single shared idempotency_record table grows unbounded if no consumer wires purgeExpired. | Operator responsibility documented in § 6. Default 24-hour replay window means unscheduled purges still bound growth by replay_window x arrival_rate. |
JSONB validation at write time rejects payloads the helper would otherwise accept. | Implementation PR runs ContainerizedPostgres tests with synthetic large / edge-case payloads to confirm PG accepts them. |
AppError projection for error_payload loses cause-chain information. | Documented as intentional in § 4. Cause chains are not durable serializable state; the immediate cause’s toString() is preserved. |
Future protocol adapters (gRPC / SQS) outgrow the api/ flat layout. | When the second protocol’s helper lands, split api/ into api/rest/, api/grpc/, etc. Cost is one round of import updates; no API change. |
Decode failures on Res (the fatal case) flap in production due to consumer-side schema drift the consumer didn’t notice. | Sentry on AppError.Internal.IncompatibleState surfaces the rate; the typed-wrapper error message carries the offending classTag (or Res decode failure detail) for triage. The recovery is “consumer fixes their schema or adds a compatibility adapter”. |
10. References
Section titled “10. References”Documents
Section titled “Documents”index.md§ 5 — pointer block in the main Phase 5a design.../goal.md— Phase 5a goal and success criteria.../task-plan.md— PR #5 sequencing.../../decision-log.md—DQ-R1-031(this design’s governing decision).
Existing common-module references
Section titled “Existing common-module references”JsonConfig.standardJson— canonical kotlinxJsoninstance.JsonConfig.refine— variant-Json builder forprettyPrint = false.DatabaseBackedMap.kt— factory-with-three-variants pattern (inTransaction/inConnection/withTx).Message.kt—companion inline operator fun <reified T : Any> invoke(...)pattern (analogous to but not identical to thetypedAsextension;Messageuses companion-invoke for non-store-shaped factories).ContainerizedPostgres— integration-test pattern (real Postgres container) for the store’s lifecycle tests.
Workspace standards
Section titled “Workspace standards”kotlin-coding— governs all Kotlin sketches in this document.plantuml-guide— diagram conventions.
Copyright: © Arda Systems 2025-2026, All rights reserved