mardi 18 mai 2021

Is there a design pattern that allows the outbox pattern to coexist with the repository pattern?

Problem Overview

I recently learnt about the Transactional Outbox Pattern. I'm trying to combine this with my existing development practices, where I typically create an interface around my data storage technology (sometimes SQL based, sometimes NoSQL). My approach is similar to the Repository Pattern.

Implementing a fully-featured transactional outbox is hard enough that I want to generalise it across my services, but I hit a stumbling block:

Writing to the outbox must be done within the same transaction as any related state changes, but the repository pattern makes this difficult.

What design patterns exist that allow a library (such as an outbox pattern implementation) to take part in transactions created by repository pattern implementations? How do others generally solve this issue?

Example code

Whilst I believe my question is language agnostic, I think the fact I am trying to solve it in Go is making the problem more difficult, so I will use that to help illustrate my point:

Naive repository example

A naive repository may have an interface like the following:

// repository code
type SomeRepository interface {
  DoThing(...)
}

// caller code
func Example(r SomeRepository) {
  r.DoThing()
}

This approach is simple but provides users with no way to control transaction boundaries.

Slightly less naive repository example

An approach I've adopted is the following:

// repository code
type SomeRepository interface {
  WithTransaction(f func(txn SomeRepositoryTxn) error) error
}

type SomeRepositoryTxn interface {
  DoThing(...)
}

// caller code
func Example(r SomeRepository) error {
  return r.WithTransaction(func(txn SomeRepositoryTxn) error {
    txn.DoThing()
    return nil
  })
}

Pain point: an outbox's publish interface

Here's the main issue: how can a generic outbox library take part in the transactions of the repositories similar to those shown above?

A mysterious transaction type

If you try an API like the following, you'll struggle to represent transactions in a general way (there is sql.Tx but that supposes an SQL database, etc.):

type SomeMessageType = ...
type SomeTransactionType = ...

type OutboxWriter interface {
  Queue(txn SomeTransactionType, msg SomeMessageType) error
}

Someone else's problem

So far, my solution is to give up and make publishing to the outbox "someone else's problem": whatever persistence implementation is provided to the outbox for its processing is also expected to provide a utility method for repositories to call (with a technology-specific transaction handle) when writing to the outbox.

E.g.:

// -----------------------
// generic outbox code
package outbox

type OutboxStorage interface {
  // methods outbox needs to do most of its processing
  // but crucially DOES NOT include writing to the outbox
}

// the outbox just uses the simple interface, writing to the outbox
// is "someone else's problem"
type Outbox struct {
  Storage OutboxStorage
}

// -----------------------
// outbox storage engine implementation 
package mysql

type MySQLStorage struct {}

// MySQLStorage implements the outbox.OutboxStorage interface

// the storage implementation provides a _utility_ method
func Queue(txn *sql.Tx, msg SomeMessageType) error

// -----------------------
// repository code
package repo

type SomeRepository interface {
  WithTransaction(f func(txn SomeRepositoryTxn) error) error
}

type SomeRepositoryTxn interface {
  DoThing(...)
  Queue(msg SomeMessageType)
  // ^- the repository must expose a queue method
  //    and is therefore responsible for ensuring it occurs
  //    within a single transaction
}

// -----------------------
// example mysql repository implementation
package repo

type MySQLSomeRepository struct {
  db *sql.DB
}

// MySQLSomeRepository implements repo.SomeRepository

type MySQLSomeRepositoryTxn struct {
  txn *sql.Tx
}

// MySQLSomeRepositoryTxn implements repo.SomeRepositoryTxn, specifically:
func (m *MySQLSomeRepositoryTxn) Queue(msg SomeMessageType) error {
  return mysql.Queue(m.txn, msg)
}

// -----------------------
// caller code

func Example(r repo.SomeRepository) error {
  return r.WithTransaction(func(txn repo.SomeRepositoryTxn) error {
    txn.DoThing()
    txn.Queue(SomeMessageType{})
    return nil
  })
}

Aucun commentaire:

Enregistrer un commentaire