pub struct Worker<T, Request>where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,{
rx: UnboundedReceiver<Message<Request, T::Future>>,
service: T,
pending_items: usize,
pending_batch_timer: Option<Pin<Box<Sleep>>>,
concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>,
failed: Option<ServiceError>,
error_handle: ErrorHandle,
close: PollSemaphore,
max_items_in_batch: usize,
max_concurrent_batches: usize,
max_latency: Duration,
}
Expand description
Task that handles processing the buffer. This type should not be used
directly, instead Buffer
requires an Executor
that can accept this task.
The struct is pub
in the private module and the type is not re-exported
as part of the public API. This is the “sealed” pattern to include “private”
types in public traits that are not meant for consumers of the library to
implement (only call).
Fields§
§rx: UnboundedReceiver<Message<Request, T::Future>>
A semaphore-bounded channel for receiving requests from the batch wrapper service.
service: T
The wrapped service that processes batches.
pending_items: usize
The number of pending items sent to service
, since the last batch flush.
pending_batch_timer: Option<Pin<Box<Sleep>>>
The timer for the pending batch, if it has any items.
The timer is started when the first entry of a new batch is submitted, so that the batch latency of all entries is at most self.max_latency. However, we don’t keep the timer running unless there is a pending request to prevent wakeups on idle services.
concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>
The batches that the worker is concurrently executing.
failed: Option<ServiceError>
An error that’s populated on permanent service failure.
error_handle: ErrorHandle
A shared error handle that’s populated on permanent service failure.
close: PollSemaphore
A cloned copy of the wrapper service’s semaphore, used to close the semaphore.
max_items_in_batch: usize
The maximum number of items allowed in a batch.
max_concurrent_batches: usize
The maximum number of batches that are allowed to run concurrently.
max_latency: Duration
The maximum delay before processing a batch with fewer than max_items_in_batch
.
Implementations§
source§impl<T, Request> Worker<T, Request>
impl<T, Request> Worker<T, Request>
sourcepub(crate) fn new(
service: T,
rx: UnboundedReceiver<Message<Request, T::Future>>,
max_items_in_batch: usize,
max_concurrent_batches: usize,
max_latency: Duration,
close: PollSemaphore,
) -> (ErrorHandle, Worker<T, Request>)
pub(crate) fn new( service: T, rx: UnboundedReceiver<Message<Request, T::Future>>, max_items_in_batch: usize, max_concurrent_batches: usize, max_latency: Duration, close: PollSemaphore, ) -> (ErrorHandle, Worker<T, Request>)
Creates a new batch worker.
See Batch::new()
for details.
sourceasync fn process_req(
&mut self,
req: Request,
tx: Sender<Result<T::Future, ServiceError>>,
)
async fn process_req( &mut self, req: Request, tx: Sender<Result<T::Future, ServiceError>>, )
Process a single worker request.
sourceasync fn flush_service(&mut self)
async fn flush_service(&mut self)
Tell the inner service to flush the current batch.
Waits until the inner service is ready, then stores a future which resolves when the batch finishes.
sourcefn can_spawn_new_batches(&self) -> bool
fn can_spawn_new_batches(&self) -> bool
Is the current number of concurrent batches above the configured limit?
sourcepub async fn run(self)
pub async fn run(self)
Run loop for batch requests, which implements the batch policies.
See Batch::new()
for details.
sourcefn failed(&mut self, error: Box<dyn Error + Send + Sync + 'static>)
fn failed(&mut self, error: Box<dyn Error + Send + Sync + 'static>)
Register an inner service failure.
The underlying service failed when we called poll_ready
on it with the given error
. We
need to communicate this to all the Buffer
handles. To do so, we wrap up the error in
an Arc
, send that Arc<E>
to all pending requests, and store it so that subsequent
requests will also fail with the same error.
Trait Implementations§
impl<'pin, T, Request> Unpin for Worker<T, Request>
Auto Trait Implementations§
impl<T, Request> !Freeze for Worker<T, Request>
impl<T, Request> !RefUnwindSafe for Worker<T, Request>
impl<T, Request> Send for Worker<T, Request>
impl<T, Request> !Sync for Worker<T, Request>
impl<T, Request> !UnwindSafe for Worker<T, Request>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more