XRootD
Loading...
Searching...
No Matches
XrdClParallelOperation.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
26#ifndef __XRD_CL_PARALLELOPERATION_HH__
27#define __XRD_CL_PARALLELOPERATION_HH__
28
34
35#include <atomic>
36#include <condition_variable>
37#include <mutex>
38
39namespace XrdCl
40{
41
42 //----------------------------------------------------------------------------
43 // Interface for different execution policies:
44 // - all : all operations need to succeed in order for the parallel
45 // operation to be successful
46 // - any : just one of the operations needs to succeed in order for
47 // the parallel operation to be successful
48 // - some : n (user defined) operations need to succeed in order for
49 // the parallel operation to be successful
50 // - at least : at least n (user defined) operations need to succeed in
51 // order for the parallel operation to be successful (the
52 // user handler will be called only when all operations are
53 // resolved)
54 //
55 // @param status : status returned by one of the aggregated operations
56 //
57 // @return : true if the status should be passed to the user handler,
58 // false otherwise.
59 //----------------------------------------------------------------------------
61 {
63 {
64 }
65
66 virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67
68 virtual XRootDStatus Result() = 0;
69 };
70
71 //----------------------------------------------------------------------------
77 //----------------------------------------------------------------------------
78 template<bool HasHndl>
79 class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
80 {
81 template<bool> friend class ParallelOperation;
82
83 public:
84
85 //------------------------------------------------------------------------
87 //------------------------------------------------------------------------
88 template<bool from>
90 ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
93 {
94 }
95
96 //------------------------------------------------------------------------
102 //------------------------------------------------------------------------
103 template<class Container>
104 ParallelOperation( Container &&container )
105 {
106 static_assert( !HasHndl, "Constructor is available only operation without handler");
107
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
112 container.clear(); // there's junk inside so we clear it
113 }
114
116 {
117 }
118
119 //------------------------------------------------------------------------
121 //------------------------------------------------------------------------
122 std::string ToString()
123 {
124 std::ostringstream oss;
125 oss << "Parallel(";
126 for( size_t i = 0; i < pipelines.size(); i++ )
127 {
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
130 {
131 oss << " && ";
132 }
133 }
134 oss << ")";
135 return oss.str();
136 }
137
138 //------------------------------------------------------------------------
143 //------------------------------------------------------------------------
145 {
146 policy.reset( new AllPolicy() );
147 return std::move( *this );
148 }
149
150 //------------------------------------------------------------------------
155 //------------------------------------------------------------------------
157 {
158 policy.reset( new AnyPolicy( pipelines.size() ) );
159 return std::move( *this );
160 }
161
162 //------------------------------------------------------------------------
163 // Set policy to `Some`
167 //------------------------------------------------------------------------
169 {
170 policy.reset( new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *this );
172 }
173
174 //------------------------------------------------------------------------
180 //------------------------------------------------------------------------
182 {
183 policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *this );
185 }
186
187 private:
188
189 //------------------------------------------------------------------------
194 //------------------------------------------------------------------------
195 struct AllPolicy : public PolicyExecutor
196 {
197 bool Examine( const XrdCl::XRootDStatus &status )
198 {
199 // keep the status in case this is the final result
200 res = status;
201 if( status.IsOK() ) return false;
202 // we require all request to succeed
203 return true;
204 }
205
206 XRootDStatus Result()
207 {
208 return res;
209 }
210
211 XRootDStatus res;
212 };
213
214 //------------------------------------------------------------------------
219 //------------------------------------------------------------------------
220 struct AnyPolicy : public PolicyExecutor
221 {
222 AnyPolicy( size_t size) : cnt( size )
223 {
224 }
225
226 bool Examine( const XrdCl::XRootDStatus &status )
227 {
228 // keep the status in case this is the final result
229 res = status;
230 // decrement the counter
231 size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
232 // we require just one operation to be successful
233 if( status.IsOK() ) return true;
234 // lets see if this is the last one?
235 if( nb == 1 ) return true;
236 // we still have a chance there will be one that is successful
237 return false;
238 }
239
240 XRootDStatus Result()
241 {
242 return res;
243 }
244
245 private:
246 std::atomic<size_t> cnt;
247 XRootDStatus res;
248 };
249
250 //------------------------------------------------------------------------
255 //------------------------------------------------------------------------
256 struct SomePolicy : PolicyExecutor
257 {
258 SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
259 threshold( threshold ), size( size )
260 {
261 }
262
263 bool Examine( const XrdCl::XRootDStatus &status )
264 {
265 // keep the status in case this is the final result
266 res = status;
267 if( status.IsOK() )
268 {
269 size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270 if( s + 1 == threshold ) return true; // we reached the threshold
271 // we are not yet there
272 return false;
273 }
274 size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
275 // did we drop below the threshold
276 if( f == size - threshold ) return true;
277 // we still have a chance there will be enough of successful operations
278 return false;
279 }
280
281 XRootDStatus Result()
282 {
283 return res;
284 }
285
286 private:
287 std::atomic<size_t> failed;
288 std::atomic<size_t> succeeded;
289 const size_t threshold;
290 const size_t size;
291 XRootDStatus res;
292 };
293
294 //------------------------------------------------------------------------
300 //------------------------------------------------------------------------
301 struct AtLeastPolicy : PolicyExecutor
302 {
303 AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
304 failed_cnt( 0 ),
305 failed_threshold( size - threshold )
306 {
307 }
308
309 //----------------------------------------------------------------------
313 //----------------------------------------------------------------------
314 bool Examine( const XrdCl::XRootDStatus &status )
315 {
316 if (!status.IsOK()) {
317 if (failed_cnt.fetch_add(1, std::memory_order_relaxed) == failed_threshold) {
318 res = status;
319 return true;
320 }
321 }
322
323 return pending_cnt.fetch_sub(1, std::memory_order_relaxed) == 1;
324 }
325
326 XRootDStatus Result()
327 {
328 return res;
329 }
330
331 private:
332 std::atomic<size_t> pending_cnt;
333 std::atomic<size_t> failed_cnt;
334 const size_t failed_threshold;
335 XRootDStatus res;
336 };
337
338 //------------------------------------------------------------------------
340 //------------------------------------------------------------------------
341 struct barrier_t
342 {
343 barrier_t() : on( true ) { }
344
345 void wait()
346 {
347 std::unique_lock<std::mutex> lck( mtx );
348 if( on ) cv.wait( lck );
349 }
350
351 void lift()
352 {
353 std::unique_lock<std::mutex> lck( mtx );
354 on = false;
355 cv.notify_all();
356 }
357
358 private:
359 std::condition_variable cv;
360 std::mutex mtx;
361 bool on;
362 };
363
364 //------------------------------------------------------------------------
369 //------------------------------------------------------------------------
370 struct Ctx
371 {
372 //----------------------------------------------------------------------
376 //----------------------------------------------------------------------
377 Ctx( PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
378 policy( policy )
379 {
380 }
381
382 //----------------------------------------------------------------------
384 //----------------------------------------------------------------------
385 ~Ctx()
386 {
387 Handle( XRootDStatus() );
388 }
389
390 //----------------------------------------------------------------------
395 //----------------------------------------------------------------------
396 inline void Examine( const XRootDStatus &st )
397 {
398 if( policy->Examine( st ) )
399 Handle( policy->Result() );
400 }
401
402 //----------------------------------------------------------------------
407 //---------------------------------------------------------------------
408 inline void Handle( const XRootDStatus &st )
409 {
410 PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
411 if( hdlr )
412 {
413 barrier.wait();
414 hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
415 }
416 }
417
418 //----------------------------------------------------------------------
420 //----------------------------------------------------------------------
421 std::atomic<PipelineHandler*> handler;
422
423 //----------------------------------------------------------------------
425 //----------------------------------------------------------------------
426 std::unique_ptr<PolicyExecutor> policy;
427
428 //----------------------------------------------------------------------
431 //----------------------------------------------------------------------
432 barrier_t barrier;
433 };
434
435 //------------------------------------------------------------------------
437 //------------------------------------------------------------------------
438 struct PipelineEnd : public Job
439 {
440 //----------------------------------------------------------------------
441 // Constructor
442 //----------------------------------------------------------------------
443 PipelineEnd( std::shared_ptr<Ctx> &ctx,
444 const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
445 {
446 }
447
448 //----------------------------------------------------------------------
449 // Run Ctx::Examine in the thread-pool
450 //----------------------------------------------------------------------
451 void Run( void* )
452 {
453 ctx->Examine( st );
454 delete this;
455 }
456
457 private:
458 std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
459 XrdCl::XRootDStatus st; //< final status of the ParallelOperation
460 };
461
462 //------------------------------------------------------------------------
464 //------------------------------------------------------------------------
465 inline static
466 void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
467 {
468 XrdCl::JobManager *mgr = XrdCl::DefaultEnv::GetPostMaster()->GetJobManager();
469 PipelineEnd *end = new PipelineEnd( ctx, st );
470 mgr->QueueJob( end, nullptr );
471 }
472
473 //------------------------------------------------------------------------
479 //------------------------------------------------------------------------
480 XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
481 {
482 // make sure we have a valid policy for the parallel operation
483 if( !policy ) policy.reset( new AllPolicy() );
484
485 std::shared_ptr<Ctx> ctx =
486 std::make_shared<Ctx>( handler, policy.release() );
487
488 uint16_t timeout = pipelineTimeout < this->timeout ?
489 pipelineTimeout : this->timeout;
490
491 for( size_t i = 0; i < pipelines.size(); ++i )
492 {
493 if( !pipelines[i] ) continue;
494 pipelines[i].Run( timeout,
495 [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
496 }
497
498 ctx->barrier.lift();
499 return XRootDStatus();
500 }
501
502 std::vector<Pipeline> pipelines;
503 std::unique_ptr<PolicyExecutor> policy;
504 };
505
506 //----------------------------------------------------------------------------
508 //----------------------------------------------------------------------------
509 template<class Container>
510 inline ParallelOperation<false> Parallel( Container &&container )
511 {
512 return ParallelOperation<false>( container );
513 }
514
515 //----------------------------------------------------------------------------
517 //----------------------------------------------------------------------------
518 inline void PipesToVec( std::vector<Pipeline>& )
519 {
520 // base case
521 }
522
523 //----------------------------------------------------------------------------
524 // Declare PipesToVec (we need to do declare those functions ahead of
525 // definitions, as they may call each other.
526 //----------------------------------------------------------------------------
527 template<typename ... Others>
528 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
529 Others&... others );
530
531 template<typename ... Others>
532 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
533 Others&... others );
534
535 template<typename ... Others>
536 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
537 Others&... others );
538
539 //----------------------------------------------------------------------------
540 // Define PipesToVec
541 //----------------------------------------------------------------------------
542 template<typename ... Others>
543 void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
544 Others&... others )
545 {
546 v.emplace_back( operation );
547 PipesToVec( v, others... );
548 }
549
550 template<typename ... Others>
551 void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
552 Others&... others )
553 {
554 v.emplace_back( operation );
555 PipesToVec( v, others... );
556 }
557
558 template<typename ... Others>
559 void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
560 Others&... others )
561 {
562 v.emplace_back( std::move( pipeline ) );
563 PipesToVec( v, others... );
564 }
565
566 //----------------------------------------------------------------------------
571 //----------------------------------------------------------------------------
572 template<typename ... Operations>
573 inline ParallelOperation<false> Parallel( Operations&& ... operations )
574 {
575 constexpr size_t size = sizeof...( operations );
576 std::vector<Pipeline> v;
577 v.reserve( size );
578 PipesToVec( v, operations... );
579 return Parallel( v );
580 }
581}
582
583#endif // __XRD_CL_OPERATIONS_HH__
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
friend class PipelineHandler
std::unique_ptr< PipelineHandler > handler
Operation handler.
ParallelOperation< HasHndl > Some(size_t threshold)
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
ParallelOperation(Container &&container)
ParallelOperation< HasHndl > All()
ParallelOperation< HasHndl > Any()
ParallelOperation< HasHndl > AtLeast(size_t threshold)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
bool IsOK() const
We're fine.