Struct tower_batch_control::service::Batch
source · pub struct Batch<T, Request>where
T: Service<BatchControl<Request>>,{
tx: UnboundedSender<Message<Request, T::Future>>,
semaphore: PollSemaphore,
permit: Option<OwnedSemaphorePermit>,
error_handle: ErrorHandle,
worker_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
Expand description
Allows batch processing of requests.
See the crate documentation for more details.
Fields§
§tx: UnboundedSender<Message<Request, T::Future>>
A custom-bounded channel for sending requests to the batch worker.
Note: this actually is bounded, but rather than using Tokio’s unbounded channel, we use tokio’s semaphore separately to implement the bound.
semaphore: PollSemaphore
A semaphore used to bound the channel.
When the buffer’s channel is full, we want to exert backpressure in
poll_ready
, so that callers such as load balancers could choose to call
another service rather than waiting for buffer capacity.
Unfortunately, this can’t be done easily using Tokio’s bounded MPSC channel, because it doesn’t wake pending tasks on close. Therefore, we implement our own bounded MPSC on top of the unbounded channel, using a semaphore to limit how many items are in the channel.
permit: Option<OwnedSemaphorePermit>
A semaphore permit that allows this service to send one message on tx
.
error_handle: ErrorHandle
An error handle shared between all service clones for the same worker.
worker_handle: Arc<Mutex<Option<JoinHandle<()>>>>
A worker task handle shared between all service clones for the same worker.
Only used when the worker is spawned on the tokio runtime.
Implementations§
source§impl<T, Request> Batch<T, Request>
impl<T, Request> Batch<T, Request>
sourcepub fn new(
service: T,
max_items_in_batch: usize,
max_batches: impl Into<Option<usize>>,
max_latency: Duration,
) -> Self
pub fn new( service: T, max_items_in_batch: usize, max_batches: impl Into<Option<usize>>, max_latency: Duration, ) -> Self
Creates a new Batch
wrapping service
.
The wrapper is responsible for telling the inner service when to flush a batch of requests. These parameters control this policy:
max_items_in_batch
gives the maximum number of items per batch.max_batches
is an upper bound on the number of batches in the queue, and the number of concurrently executing batches. If this isNone
, we use the current number of [rayon
] threads. The number of batches in the queue is also limited byQUEUE_BATCH_LIMIT
.max_latency
gives the maximum latency for a batch item to start verifying.
The default Tokio executor is used to run the given service, which means that this method must be called while on the Tokio runtime.
sourcepub fn pair(
service: T,
max_items_in_batch: usize,
max_batches: impl Into<Option<usize>>,
max_latency: Duration,
) -> (Self, Worker<T, Request>)
pub fn pair( service: T, max_items_in_batch: usize, max_batches: impl Into<Option<usize>>, max_latency: Duration, ) -> (Self, Worker<T, Request>)
Creates a new Batch
wrapping service
, but returns the background worker.
This is useful if you do not want to spawn directly onto the tokio
runtime but instead want to use your own executor. This will return the
Batch
and the background Worker
that you can then spawn.
sourcepub fn register_worker(&mut self, worker_handle: JoinHandle<()>)
pub fn register_worker(&mut self, worker_handle: JoinHandle<()>)
Ask the Batch
to monitor the spawned worker task’s [JoinHandle
].
Only used when the task is spawned on the tokio runtime.
Trait Implementations§
source§impl<T, Request> Clone for Batch<T, Request>where
T: Service<BatchControl<Request>>,
impl<T, Request> Clone for Batch<T, Request>where
T: Service<BatchControl<Request>>,
source§impl<T, Request> Debug for Batch<T, Request>where
T: Service<BatchControl<Request>>,
impl<T, Request> Debug for Batch<T, Request>where
T: Service<BatchControl<Request>>,
source§impl<T, Request> Service<Request> for Batch<T, Request>
impl<T, Request> Service<Request> for Batch<T, Request>
source§type Response = <T as Service<BatchControl<Request>>>::Response
type Response = <T as Service<BatchControl<Request>>>::Response
source§type Future = ResponseFuture<<T as Service<BatchControl<Request>>>::Future>
type Future = ResponseFuture<<T as Service<BatchControl<Request>>>::Future>
Auto Trait Implementations§
impl<T, Request> Freeze for Batch<T, Request>
impl<T, Request> !RefUnwindSafe for Batch<T, Request>
impl<T, Request> Send for Batch<T, Request>
impl<T, Request> Sync for Batch<T, Request>
impl<T, Request> Unpin for Batch<T, Request>
impl<T, Request> !UnwindSafe for Batch<T, Request>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more§impl<M, S, Target, Request> MakeService<Target, Request> for Mwhere
M: Service<Target, Response = S>,
S: Service<Request>,
impl<M, S, Target, Request> MakeService<Target, Request> for Mwhere
M: Service<Target, Response = S>,
S: Service<Request>,
§fn poll_ready(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), <M as MakeService<Target, Request>>::MakeError>>
fn poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), <M as MakeService<Target, Request>>::MakeError>>
Poll::Ready
when the factory is able to create more services. Read more§fn make_service(
&mut self,
target: Target,
) -> <M as MakeService<Target, Request>>::Future
fn make_service( &mut self, target: Target, ) -> <M as MakeService<Target, Request>>::Future
§fn into_service(self) -> IntoService<Self, Request>where
Self: Sized,
fn into_service(self) -> IntoService<Self, Request>where
Self: Sized,
§fn as_service(&mut self) -> AsService<'_, Self, Request>where
Self: Sized,
fn as_service(&mut self) -> AsService<'_, Self, Request>where
Self: Sized,
§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T, Request> ServiceExt<Request> for Twhere
T: Service<Request> + ?Sized,
impl<T, Request> ServiceExt<Request> for Twhere
T: Service<Request> + ?Sized,
§fn ready(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
fn ready(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
§fn ready_and(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
fn ready_and(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
ServiceExt::ready
method instead§fn ready_oneshot(self) -> ReadyOneshot<Self, Request>where
Self: Sized,
fn ready_oneshot(self) -> ReadyOneshot<Self, Request>where
Self: Sized,
§fn oneshot(self, req: Request) -> Oneshot<Self, Request>where
Self: Sized,
fn oneshot(self, req: Request) -> Oneshot<Self, Request>where
Self: Sized,
Service
, calling with the providing request once it is ready.§fn and_then<F>(self, f: F) -> AndThen<Self, F>
fn and_then<F>(self, f: F) -> AndThen<Self, F>
poll_ready
method. Read more§fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
poll_ready
method. Read more§fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
poll_ready
method. Read more§fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
Result<Self::Response, Self::Error>
)
to a different value, regardless of whether the future succeeds or
fails. Read more§fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
§fn filter<F, NewRequest>(self, filter: F) -> Filter<Self, F>where
Self: Sized,
F: Predicate<NewRequest>,
fn filter<F, NewRequest>(self, filter: F) -> Filter<Self, F>where
Self: Sized,
F: Predicate<NewRequest>,
§fn filter_async<F, NewRequest>(self, filter: F) -> AsyncFilter<Self, F>where
Self: Sized,
F: AsyncPredicate<NewRequest>,
fn filter_async<F, NewRequest>(self, filter: F) -> AsyncFilter<Self, F>where
Self: Sized,
F: AsyncPredicate<NewRequest>,
AsyncFilter
that conditionally accepts or
rejects requests based on an [async predicate]. Read more