pub struct Worker<T, Request: RequestWeight>where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,{
rx: UnboundedReceiver<Message<Request, T::Future>>,
service: T,
pending_items_weight: usize,
pending_batch_timer: Option<Pin<Box<Sleep>>>,
concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>,
failed: Option<ServiceError>,
error_handle: ErrorHandle,
close: PollSemaphore,
max_items_weight_in_batch: usize,
max_concurrent_batches: usize,
max_latency: Duration,
}Expand description
Task that handles processing the buffer. This type should not be used
directly, instead Buffer requires an Executor that can accept this task.
The struct is pub in the private module and the type is not re-exported
as part of the public API. This is the “sealed” pattern to include “private”
types in public traits that are not meant for consumers of the library to
implement (only call).
Fields§
§rx: UnboundedReceiver<Message<Request, T::Future>>A semaphore-bounded channel for receiving requests from the batch wrapper service.
service: TThe wrapped service that processes batches.
pending_items_weight: usizeThe total weight of pending requests sent to service, since the last batch flush.
pending_batch_timer: Option<Pin<Box<Sleep>>>The timer for the pending batch, if it has any items.
The timer is started when the first entry of a new batch is submitted, so that the batch latency of all entries is at most self.max_latency. However, we don’t keep the timer running unless there is a pending request to prevent wakeups on idle services.
concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>The batches that the worker is concurrently executing.
failed: Option<ServiceError>An error that’s populated on permanent service failure.
error_handle: ErrorHandleA shared error handle that’s populated on permanent service failure.
close: PollSemaphoreA cloned copy of the wrapper service’s semaphore, used to close the semaphore.
max_items_weight_in_batch: usizeThe maximum weight of pending items in a batch before it should be flushed and pending items should be added to a new batch.
max_concurrent_batches: usizeThe maximum number of batches that are allowed to run concurrently.
max_latency: DurationThe maximum delay before processing a batch with items that have a total weight
that is less than max_items_weight_in_batch.
Implementations§
Source§impl<T, Request: RequestWeight> Worker<T, Request>
impl<T, Request: RequestWeight> Worker<T, Request>
Source§impl<T, Request: RequestWeight> Worker<T, Request>
impl<T, Request: RequestWeight> Worker<T, Request>
Sourcepub(crate) fn new(
service: T,
rx: UnboundedReceiver<Message<Request, T::Future>>,
max_items_weight_in_batch: usize,
max_concurrent_batches: usize,
max_latency: Duration,
close: PollSemaphore,
) -> (ErrorHandle, Worker<T, Request>)
pub(crate) fn new( service: T, rx: UnboundedReceiver<Message<Request, T::Future>>, max_items_weight_in_batch: usize, max_concurrent_batches: usize, max_latency: Duration, close: PollSemaphore, ) -> (ErrorHandle, Worker<T, Request>)
Creates a new batch worker.
See Batch::new() for details.
Sourceasync fn process_req(
&mut self,
req: Request,
tx: Sender<Result<T::Future, ServiceError>>,
)
async fn process_req( &mut self, req: Request, tx: Sender<Result<T::Future, ServiceError>>, )
Process a single worker request.
Sourceasync fn flush_service(&mut self)
async fn flush_service(&mut self)
Tell the inner service to flush the current batch.
Waits until the inner service is ready, then stores a future which resolves when the batch finishes.
Sourcefn can_spawn_new_batches(&self) -> bool
fn can_spawn_new_batches(&self) -> bool
Is the current number of concurrent batches above the configured limit?
Sourcepub async fn run(self)
pub async fn run(self)
Run loop for batch requests, which implements the batch policies.
See Batch::new() for details.
Sourcefn failed(&mut self, error: Box<dyn Error + Send + Sync + 'static>)
fn failed(&mut self, error: Box<dyn Error + Send + Sync + 'static>)
Register an inner service failure.
The underlying service failed when we called poll_ready on it with the given error. We
need to communicate this to all the Buffer handles. To do so, we wrap up the error in
an Arc, send that Arc<E> to all pending requests, and store it so that subsequent
requests will also fail with the same error.
Trait Implementations§
Source§impl<T, Request: RequestWeight> Drop for Worker<T, Request>
impl<T, Request: RequestWeight> Drop for Worker<T, Request>
impl<'pin, T, Request: RequestWeight> Unpin for Worker<T, Request>
Auto Trait Implementations§
impl<T, Request> !Freeze for Worker<T, Request>
impl<T, Request> !RefUnwindSafe for Worker<T, Request>
impl<T, Request> Send for Worker<T, Request>
impl<T, Request> !Sync for Worker<T, Request>
impl<T, Request> !UnwindSafe for Worker<T, Request>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more