tower_batch_control::worker

Struct Worker

source
pub struct Worker<T, Request>
where T: Service<BatchControl<Request>>, T::Future: Send + 'static, T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,
{ rx: UnboundedReceiver<Message<Request, T::Future>>, service: T, pending_items: usize, pending_batch_timer: Option<Pin<Box<Sleep>>>, concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>, failed: Option<ServiceError>, error_handle: ErrorHandle, close: PollSemaphore, max_items_in_batch: usize, max_concurrent_batches: usize, max_latency: Duration, }
Expand description

Task that handles processing the buffer. This type should not be used directly, instead Buffer requires an Executor that can accept this task.

The struct is pub in the private module and the type is not re-exported as part of the public API. This is the “sealed” pattern to include “private” types in public traits that are not meant for consumers of the library to implement (only call).

Fields§

§rx: UnboundedReceiver<Message<Request, T::Future>>

A semaphore-bounded channel for receiving requests from the batch wrapper service.

§service: T

The wrapped service that processes batches.

§pending_items: usize

The number of pending items sent to service, since the last batch flush.

§pending_batch_timer: Option<Pin<Box<Sleep>>>

The timer for the pending batch, if it has any items.

The timer is started when the first entry of a new batch is submitted, so that the batch latency of all entries is at most self.max_latency. However, we don’t keep the timer running unless there is a pending request to prevent wakeups on idle services.

§concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>

The batches that the worker is concurrently executing.

§failed: Option<ServiceError>

An error that’s populated on permanent service failure.

§error_handle: ErrorHandle

A shared error handle that’s populated on permanent service failure.

§close: PollSemaphore

A cloned copy of the wrapper service’s semaphore, used to close the semaphore.

§max_items_in_batch: usize

The maximum number of items allowed in a batch.

§max_concurrent_batches: usize

The maximum number of batches that are allowed to run concurrently.

§max_latency: Duration

The maximum delay before processing a batch with fewer than max_items_in_batch.

Implementations§

source§

impl<T, Request> Worker<T, Request>
where T: Service<BatchControl<Request>>, T::Future: Send + 'static, T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,

source

pub(crate) fn project<'pin>( self: Pin<&'pin mut Self>, ) -> __WorkerProjection<'pin, T, Request>

source

pub(crate) fn project_ref<'pin>( self: Pin<&'pin Self>, ) -> __WorkerProjectionRef<'pin, T, Request>

source§

impl<T, Request> Worker<T, Request>
where T: Service<BatchControl<Request>>, T::Future: Send + 'static, T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,

source

pub(crate) fn new( service: T, rx: UnboundedReceiver<Message<Request, T::Future>>, max_items_in_batch: usize, max_concurrent_batches: usize, max_latency: Duration, close: PollSemaphore, ) -> (ErrorHandle, Worker<T, Request>)

Creates a new batch worker.

See Batch::new() for details.

source

async fn process_req( &mut self, req: Request, tx: Sender<Result<T::Future, ServiceError>>, )

Process a single worker request.

source

async fn flush_service(&mut self)

Tell the inner service to flush the current batch.

Waits until the inner service is ready, then stores a future which resolves when the batch finishes.

source

fn can_spawn_new_batches(&self) -> bool

Is the current number of concurrent batches above the configured limit?

source

pub async fn run(self)

Run loop for batch requests, which implements the batch policies.

See Batch::new() for details.

source

fn failed(&mut self, error: Box<dyn Error + Send + Sync + 'static>)

Register an inner service failure.

The underlying service failed when we called poll_ready on it with the given error. We need to communicate this to all the Buffer handles. To do so, we wrap up the error in an Arc, send that Arc<E> to all pending requests, and store it so that subsequent requests will also fail with the same error.

Trait Implementations§

source§

impl<T, Request: Debug> Debug for Worker<T, Request>
where T: Service<BatchControl<Request>> + Debug, T::Future: Send + 'static + Debug, T::Error: Into<Box<dyn Error + Send + Sync + 'static>> + Debug, T::Response: Debug,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T, Request> Drop for Worker<T, Request>
where T: Service<BatchControl<Request>>, T::Future: Send + 'static, T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl<'pin, T, Request> Unpin for Worker<T, Request>
where T: Service<BatchControl<Request>>, T::Future: Send + 'static, T::Error: Into<Box<dyn Error + Send + Sync + 'static>>, PinnedFieldsOf<__Worker<'pin, T, Request>>: Unpin,

Auto Trait Implementations§

§

impl<T, Request> !Freeze for Worker<T, Request>

§

impl<T, Request> !RefUnwindSafe for Worker<T, Request>

§

impl<T, Request> Send for Worker<T, Request>
where <T as Service<BatchControl<Request>>>::Error: Sized, T: Send, Request: Send,

§

impl<T, Request> !Sync for Worker<T, Request>

§

impl<T, Request> !UnwindSafe for Worker<T, Request>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more