Package com.mimecast.robin.queue
Class SQLQueueDatabase<T extends Serializable>
java.lang.Object
com.mimecast.robin.queue.SQLQueueDatabase<T>
- Type Parameters:
T- payload type
- All Implemented Interfaces:
QueueDatabase<T>,Closeable,AutoCloseable
- Direct Known Subclasses:
QueueMariaDB,QueuePgSQL
public abstract class SQLQueueDatabase<T extends Serializable>
extends Object
implements QueueDatabase<T>
SQL-backed scheduled work queue.
-
Nested Class Summary
Nested Classes -
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionbooleanacknowledge(String uid) Acknowledges successful completion and removes the item from the active queue.voidapplyMutations(QueueMutationBatch<T> batch) Applies a batch of dequeue outcomes and derived enqueues atomically when supported.private intbindFilter(PreparedStatement statement, QueueListFilter filter, int startIndex) private voidbindItem(PreparedStatement statement, QueueItem<T> item) private StringbuildWhereClause(QueueListFilter filter) claimReady(int limit, long nowEpochSeconds, String consumerId, long claimUntilEpochSeconds) Claims ready items for the given consumer until the lease expires.claimReadyLegacy(int limit, long nowEpochSeconds, String consumerId, long claimUntilEpochSeconds) voidclear()Clears the full queue state, including dead items.voidclose()private longcountByState(QueueItemState state) private voiddeleteBatch(Connection connection, List<String> ackUids) booleandeleteByUID(String uid) Removes the item by queue UID regardless of state.intdeleteByUIDs(List<String> uids) Removes multiple items by queue UID regardless of state.Inserts a ready queue item.fetchByUIDs(Connection connection, List<String> uids) Returns the item by queue UID.protected abstract Stringprotected abstract StringvoidInitialize external resources.private voidinsertNewItems(Connection connection, List<T> newItems) private Stringlist(int offset, int limit, QueueListFilter filter) Returns a paged queue listing ordered by creation time.booleanMarks an item as dead.private longminEpochForState(String column, QueueItemState state) intreleaseExpiredClaims(long nowEpochSeconds) Releases expired claims back to READY.booleanreschedule(QueueItem<T> item, long nextAttemptAtEpochSeconds, String lastError) Reschedules a claimed item for a future attempt.private static voidsetNullableString(PreparedStatement statement, int index, String value) longsize()Active queue size.stats()Aggregate queue statistics.private voidupdateDead(Connection connection, List<QueueMutation<T>> mutations) private voidupdateReschedules(Connection connection, List<QueueMutation<T>> mutations) private StringvalidateTableName(String tableName)
-
Field Details
-
log
private static final org.apache.logging.log4j.Logger log -
tableName
-
jdbcUrl
-
username
-
password
-
maxPoolSize
private final int maxPoolSize -
dataSource
private com.zaxxer.hikari.HikariDataSource dataSource
-
-
Constructor Details
-
SQLQueueDatabase
-
-
Method Details
-
getDatabaseType
-
getCreateTableSQL
-
getCreateIndexSQL
-
initialize
public void initialize()Description copied from interface:QueueDatabaseInitialize external resources.- Specified by:
initializein interfaceQueueDatabase<T extends Serializable>
-
enqueue
Description copied from interface:QueueDatabaseInserts a ready queue item.- Specified by:
enqueuein interfaceQueueDatabase<T extends Serializable>
-
applyMutations
Description copied from interface:QueueDatabaseApplies a batch of dequeue outcomes and derived enqueues atomically when supported.- Specified by:
applyMutationsin interfaceQueueDatabase<T extends Serializable>
-
claimReady
public List<QueueItem<T>> claimReady(int limit, long nowEpochSeconds, String consumerId, long claimUntilEpochSeconds) Description copied from interface:QueueDatabaseClaims ready items for the given consumer until the lease expires.- Specified by:
claimReadyin interfaceQueueDatabase<T extends Serializable>
-
acknowledge
Description copied from interface:QueueDatabaseAcknowledges successful completion and removes the item from the active queue.- Specified by:
acknowledgein interfaceQueueDatabase<T extends Serializable>
-
reschedule
Description copied from interface:QueueDatabaseReschedules a claimed item for a future attempt.- Specified by:
reschedulein interfaceQueueDatabase<T extends Serializable>
-
releaseExpiredClaims
public int releaseExpiredClaims(long nowEpochSeconds) Description copied from interface:QueueDatabaseReleases expired claims back to READY.- Specified by:
releaseExpiredClaimsin interfaceQueueDatabase<T extends Serializable>
-
markDead
Description copied from interface:QueueDatabaseMarks an item as dead.- Specified by:
markDeadin interfaceQueueDatabase<T extends Serializable>
-
size
public long size()Description copied from interface:QueueDatabaseActive queue size.- Specified by:
sizein interfaceQueueDatabase<T extends Serializable>
-
stats
Description copied from interface:QueueDatabaseAggregate queue statistics.- Specified by:
statsin interfaceQueueDatabase<T extends Serializable>
-
list
Description copied from interface:QueueDatabaseReturns a paged queue listing ordered by creation time.- Specified by:
listin interfaceQueueDatabase<T extends Serializable>
-
getByUID
Description copied from interface:QueueDatabaseReturns the item by queue UID.- Specified by:
getByUIDin interfaceQueueDatabase<T extends Serializable>
-
deleteByUID
Description copied from interface:QueueDatabaseRemoves the item by queue UID regardless of state.- Specified by:
deleteByUIDin interfaceQueueDatabase<T extends Serializable>
-
deleteByUIDs
Description copied from interface:QueueDatabaseRemoves multiple items by queue UID regardless of state.- Specified by:
deleteByUIDsin interfaceQueueDatabase<T extends Serializable>
-
clear
public void clear()Description copied from interface:QueueDatabaseClears the full queue state, including dead items.- Specified by:
clearin interfaceQueueDatabase<T extends Serializable>
-
close
public void close()- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable
-
claimReadyLegacy
-
insertSql
-
deleteBatch
- Throws:
SQLException
-
updateReschedules
private void updateReschedules(Connection connection, List<QueueMutation<T>> mutations) throws SQLException - Throws:
SQLException
-
updateDead
private void updateDead(Connection connection, List<QueueMutation<T>> mutations) throws SQLException - Throws:
SQLException
-
insertNewItems
- Throws:
SQLException
-
bindItem
- Throws:
SQLException
-
readItem
- Throws:
SQLException
-
fetchByUIDs
private List<QueueItem<T>> fetchByUIDs(Connection connection, List<String> uids) throws SQLException - Throws:
SQLException
-
countByState
-
minEpochForState
-
buildWhereClause
-
bindFilter
private int bindFilter(PreparedStatement statement, QueueListFilter filter, int startIndex) throws SQLException - Throws:
SQLException
-
setNullableString
private static void setNullableString(PreparedStatement statement, int index, String value) throws SQLException - Throws:
SQLException
-
validateTableName
-