XRootD
Loading...
Searching...
No Matches
XrdSsiFileReq.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S s i F i l e R e q . c c */
4/* */
5/* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <cstdio>
31#include <cstring>
32#include <arpa/inet.h>
33#include <sys/types.h>
34
38#include "XrdSfs/XrdSfsDio.hh"
39#include "XrdSsi/XrdSsiAlert.hh"
45#include "XrdSsi/XrdSsiSfs.hh"
47#include "XrdSsi/XrdSsiStats.hh"
48#include "XrdSsi/XrdSsiTrace.hh"
49#include "XrdSsi/XrdSsiUtils.hh"
50#include "XrdSys/XrdSysError.hh"
51
52/******************************************************************************/
53/* L o c a l M a c r o s */
54/******************************************************************************/
55
56#define DEBUGXQ(x) DEBUG(rID<<sessN<<rspstID[urState]<<reqstID[myState]<<x)
57
58#define DUMPIT(x,y) XrdSsiUtils::b2x(x,y,hexBuff,sizeof(hexBuff),dotBuff)<<dotBuff
59
60/******************************************************************************/
61/* G l o b a l s */
62/******************************************************************************/
63
64namespace XrdSsi
65{
67extern XrdScheduler *Sched;
70};
71
72using namespace XrdSsi;
73
74/******************************************************************************/
75/* S t a t i c L o c a l s */
76/******************************************************************************/
77
78namespace
79{
80const char *rspstID[XrdSsiFileReq::isMax] =
81 {" [new", " [begun", " [bound",
82 " [abort", " [done"
83 };
84
85const char *reqstID[XrdSsiFileReq::rsEnd] =
86 {" wtReq] ", " xqReq] ", " wtRsp] ",
87 " doRsp] ", " odRsp] ", " erRsp] "
88 };
89};
90
91/******************************************************************************/
92/* L o c a l C l a s s e s */
93/******************************************************************************/
94
95namespace
96{
97class FinalizeJob : public XrdJob
98{
99public:
100
101void DoIt() {reqP->Finalize();
102 fileP->DeferredFinalizeDone(reqP, reqID);
103 delete this;
104 }
105
106 FinalizeJob(XrdSsiFileReq *rP, XrdSsiFileSess *fP, unsigned int id) :
107 reqP(rP), fileP(fP), reqID(id) {}
108 ~FinalizeJob() {}
109
110private:
111XrdSsiFileReq *reqP;
112XrdSsiFileSess *fileP;
113unsigned int reqID;
114};
115}
116
117/******************************************************************************/
118/* S t a t i c M e m b e r s */
119/******************************************************************************/
120
121XrdSysMutex XrdSsiFileReq::aqMutex;
122XrdSsiFileReq *XrdSsiFileReq::freeReq = 0;
123int XrdSsiFileReq::freeCnt = 0;
124int XrdSsiFileReq::freeMax = 256;
125
126/******************************************************************************/
127/* A c t i v a t e */
128/******************************************************************************/
129
131{
132 EPNAME("Activate");
133
134// Do some debugging
135//
136 DEBUGXQ((oP ? "oucbuff" : "sfsbuff") <<" rqsz=" <<rSz);
137
138// Do statistics
139//
140 Stats.statsMutex.Lock();
141 Stats.ReqCount++;
142 Stats.ReqBytes += rSz;
143 if (rSz > Stats.ReqMaxsz) Stats.ReqMaxsz = rSz;
144 Stats.statsMutex.UnLock();
145
146// Set request buffer pointers
147//
148 oucBuff = oP;
149 sfsBref = bR;
150 reqSize = rSz;
151
152// Now schedule ourselves to process this request. The state is new.
153//
154 Sched->Schedule((XrdJob *)this);
155}
156
157/******************************************************************************/
158/* A l e r t */
159/******************************************************************************/
160
162{
163 EPNAME("Alert");
164 XrdSsiAlert *aP;
165 int msgLen;
166
167// Do some debugging
168//
169 aMsg.GetMsg(msgLen);
170 DEBUGXQ(msgLen <<" byte alert presented wtr=" <<respWait);
171
172// Add up statistics
173//
174 Stats.Bump(Stats.ReqAlerts);
175
176// Lock this object
177//
178 frqMutex.Lock();
179
180// Validate the length and whether this call is allowed
181//
182 if (msgLen <= 0 || haveResp || isEnding)
183 {frqMutex.UnLock();
184 aMsg.RecycleMsg();
185 return;
186 }
187
188// Allocate an alert object and chain it into the pending queue
189//
190 aP = XrdSsiAlert::Alloc(aMsg);
191
192// Alerts must be sent in the orer they are presented. So, check if we need
193// to chain this and try to send the first in the chain. This only really
194// matters if we can send the alert now because the client is waiting.
195//
196 if (respWait)
197 {if (alrtPend)
198 {alrtLast->next = aP;
199 alrtLast = aP;
200 aP = alrtPend;
201 alrtPend = alrtPend->next;
202 }
203 WakeUp(aP);
204 } else {
205 if (alrtLast) alrtLast->next = aP;
206 else alrtPend = aP;
207 alrtLast = aP;
208 }
209
210// All done
211//
212 frqMutex.UnLock();
213}
214
215/******************************************************************************/
216/* A l l o c */
217/******************************************************************************/
218
221 XrdSsiFileSess *fP,
222 const char *sID,
223 const char *cID,
224 unsigned int rnum)
225{
226 XrdSsiFileReq *nP;
227
228// Check if we can grab this from out queue
229//
230 aqMutex.Lock();
231 if ((nP = freeReq))
232 {freeCnt--;
233 freeReq = nP->nextReq;
234 aqMutex.UnLock();
235 nP->Init(cID);
236 } else {
237 aqMutex.UnLock();
238 nP = new XrdSsiFileReq(cID);
239 }
240
241// Initialize for processing
242//
243 if (nP)
244 {nP->sessN = sID;
245 nP->fileR = rP;
246 nP->fileP = fP;
247 nP->cbInfo = eiP;
248 nP->reqID = rnum;
249 snprintf(nP->rID, sizeof(nP->rID), "%u:", rnum);
250 }
251
252// Return the pointer
253//
254 return nP;
255}
256
257/******************************************************************************/
258/* Private: B i n d D o n e */
259/******************************************************************************/
260
261// This is called with frqMutex locked!
262
263void XrdSsiFileReq::BindDone()
264{
265 EPNAME("BindDone");
266
267// Do some debugging
268//
269 DEBUGXQ("Bind called; for request " <<reqID);
270
271// Collect statistics
272//
273 Stats.Bump(Stats.ReqBound);
274
275// Processing depends on the current state. Only listed states are valid.
276// When the state is done, a finished event occuured between the time the
277// request was handed off to the service but before the service bound it.
278//
279 switch(urState)
280 {case isBegun: urState = isBound;
281 case isBound: return;
282 break;
283 case isDone: if (!schedDone)
284 {schedDone = true;
285 Sched->Schedule((XrdJob *)this);
286 }
287 return;
288 break;
289 default: break;
290 }
291
292// If we get here then we have an invalid state. Report it but otherwise we
293// can't really do anything else. This means some memory may be lost.
294//
295 Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
296}
297
298/******************************************************************************/
299/* D e f e r r e d F i n a l i z e */
300/******************************************************************************/
301
303{
304 Sched->Schedule(new FinalizeJob(this, fileP, reqID));
305}
306
307/******************************************************************************/
308/* D i s p o s e */
309/******************************************************************************/
310
311void XrdSsiFileReq::Dispose()
312{
313 EPNAME("Dispose");
314
315// Do some debugging
316//
317 DEBUGXQ("Recycling request...");
318
319// Collect statistics
320//
321 Stats.Bump(Stats.ReqBound, -1);
322
323// Simply recycle the object
324//
325 Recycle();
326}
327
328/******************************************************************************/
329/* D o I t */
330/******************************************************************************/
331
333{
334 EPNAME("DoIt");
335 bool cancel;
336
337// Processing is determined by the responder's state. Only listed states are
338// valid. Others should never occur in this context.
339//
340 frqMutex.Lock();
341 switch(urState)
342 {case isNew: myState = xqReq; urState = isBegun;
343 DEBUGXQ("Calling service processor");
344 frqMutex.UnLock();
345 Stats.Bump(Stats.ReqProcs);
346 Service->ProcessRequest((XrdSsiRequest &)*this,
347 (XrdSsiFileResource &)*fileR);
348 return;
349 break;
350 case isAbort: DEBUGXQ("Skipped calling service processor");
351 frqMutex.UnLock();
352 Stats.Bump(Stats.ReqAborts);
353 Recycle();
354 return;
355 break;
356 case isDone: cancel = (myState != odRsp);
357 DEBUGXQ("Calling Finished(" <<cancel <<')');
358 if (respWait) WakeUp();
359 if (finWait) finWait->Post();
360 frqMutex.UnLock();
361 Stats.Bump(Stats.ReqFinished);
362 if (cancel) Stats.Bump(Stats.ReqCancels);
363 Finished(cancel); // This object may be deleted!
364 return;
365 break;
366 default: break;
367 }
368
369// If we get here then we have an invalid state. Report it but otherwise we
370// can't really do anything else. This means some memory may be lost.
371//
372 frqMutex.UnLock();
373 Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
374}
375
376/******************************************************************************/
377/* D o n e */
378/******************************************************************************/
379
380// Gets invoked only after query() waitresp signal was sent
381
382void XrdSsiFileReq::Done(int &retc, XrdOucErrInfo *eiP, const char *name)
383{
384 EPNAME("Done");
385 XrdSsiMutexMon mHelper(frqMutex);
386
387// We may need to delete the errinfo object if this callback was async. Note
388// that the following test is valid even if the file object has been deleted.
389//
390 if (eiP != fileP->errInfo()) delete eiP;
391
392// Check if we should finalize this request. This will be the case if the
393// complete response was sent.
394//
395 if (myState == odRsp)
396 {DEBUGXQ("resp sent; no additional data remains");
397 if (!fileP->DeferFinalize(this,reqID)) Finalize();
398 return;
399 }
400
401// Do some debugging
402//
403 DEBUGXQ("wtrsp sent; resp " <<(haveResp ? "here" : "pend"));
404
405// We are invoked when sync() waitresp has been sent, check if a response was
406// posted while this was going on. If so, make sure to send a wakeup. Note
407// that the respWait flag is at this moment false as this is called in the
408// sync response path for fctl() and the response may have been posted.
409//
410 if (!haveResp) respWait = true;
411 else WakeUp();
412}
413
414/******************************************************************************/
415/* Private: E m s g */
416/******************************************************************************/
417
418int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
419 int ecode, // The error code
420 const char *op) // Operation being performed
421{
422 char buffer[2048];
423
424// Count errors
425//
426 Stats.Bump(Stats.SsiErrs);
427
428// Get correct error code
429//
430 if (ecode < 0) ecode = -ecode;
431
432// Format the error message
433//
434 XrdOucERoute::Format(buffer, sizeof(buffer), ecode, op, sessN);
435
436// Put the message in the log
437//
438 Log.Emsg(pfx, tident, buffer);
439
440// Place the error message in the error object and return
441//
442 if (cbInfo) cbInfo->setErrInfo(ecode, buffer);
443 return SFS_ERROR;
444}
445
446/******************************************************************************/
447
448int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
449 XrdSsiErrInfo &eObj, // The error description
450 const char *op) // Operation being performed
451{
452 const char *eMsg;
453 char buffer[2048];
454 int eNum;
455
456// Count errors
457//
459
460// Get correct error code and message
461//
462 eMsg = eObj.Get(eNum).c_str();
463 if (eNum <= 0) eNum = EFAULT;
464 if (!eMsg || !(*eMsg)) eMsg = "reason unknown";
465
466// Format the error message
467//
468 snprintf(buffer, sizeof(buffer), "Unable to %s %s; %s", op, sessN, eMsg);
469
470// Put the message in the log
471//
472 Log.Emsg(pfx, tident, buffer);
473
474// Place the error message in the error object and return
475//
476 if (cbInfo) cbInfo->setErrInfo(eNum, buffer);
477 return SFS_ERROR;
478}
479
480/******************************************************************************/
481/* F i n a l i z e */
482/******************************************************************************/
483
485{
486 EPNAME("Finalize");
487 XrdSsiMutexMon mHelper(frqMutex);
488 bool cancel = (myState != odRsp);
489
490// Release any unsent alerts (prevent any new alerts from being accepted)
491//
492 isEnding = true;
493 if (alrtSent || alrtPend)
494 {XrdSsiAlert *dP, *aP = alrtSent;
495 if (aP) aP->next = alrtPend;
496 else aP = alrtPend;
497 mHelper.UnLock();
498 while((dP = aP)) {aP = aP->next; dP->Recycle();}
499 mHelper.Lock(frqMutex);
500 }
501
502// Processing is determined by the responder's state
503//
504 switch(urState)
505 // Request is being scheduled, so we can simply abort it.
506 //
507 {case isNew: urState = isAbort;
508 cbInfo = 0;
509 sessN = "???";
510 Stats.Bump(Stats.ReqAborts);
511 DEBUGXQ("Aborting request processing");
512 return;
513 break;
514
515 // Request already handed off but not yet bound. Defer until bound.
516 // We need to wait until this occurs to sequence Unprovision().
517 //
518 case isBegun: urState = isDone;
519 {XrdSysSemaphore wt4fin(0);
520 finWait = &wt4fin;
521 mHelper.UnLock();
522 wt4fin.Wait();
523 }
524 sessN = "n/a";
525 return;
526
527 // Request is bound so we can finish right off.
528 //
529 case isBound: urState = isDone;
530 if (strBuff) {strBuff->Recycle(); strBuff = 0;}
531 DEBUGXQ("Calling Finished(" <<cancel <<')');
532 if (respWait) WakeUp();
533 mHelper.UnLock();
534 Stats.Bump(Stats.ReqFinished);
535 if (cancel) Stats.Bump(Stats.ReqCancels);
536 Finished(cancel); // This object may be deleted!
537 sessN = "n/a";
538 return;
539 break;
540
541 // The following two cases may happen but it's safe to ignore them.
542 //
543 case isAbort:
544 case isDone: sessN = "bad";
545 return;
546 break;
547 default: break;
548 }
549
550// If we get here then we have an invalid state. Report it but otherwise we
551// can't really do anything else. This means some memory may be lost.
552//
553 Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
554}
555
556/******************************************************************************/
557/* G e t R e q u e s t */
558/******************************************************************************/
559
561{
562 EPNAME("GetRequest");
563
564// Do some debugging
565//
566 DEBUGXQ("sz=" <<reqSize);
567 Stats.Bump(Stats.ReqGets);
568
569// The request may come from a ouc buffer or an sfs buffer
570//
571 rLen = reqSize;
572 if (oucBuff) return oucBuff->Data();
573 return XrdSfsXio::Buffer(sfsBref);
574}
575
576/******************************************************************************/
577/* Private: I n i t */
578/******************************************************************************/
579
580void XrdSsiFileReq::Init(const char *cID)
581{
582 tident = (cID ? strdup(cID) : strdup("???"));
583 finWait = 0;
584 nextReq = 0;
585 cbInfo = 0;
586 respCB = 0;
587 respCBarg = 0;
588 alrtSent = 0;
589 alrtPend = 0;
590 alrtLast = 0;
591 sessN = "???";
592 oucBuff = 0;
593 sfsBref = 0;
594 strBuff = 0;
595 reqSize = 0;
596 respBuf = 0;
597 respOff = 0;
598 fileSz = 0; // Also does respLen = 0;
599 myState = wtReq;
600 urState = isNew;
601 *rID = 0;
602 schedDone = false;
603 haveResp = false;
604 respWait = false;
605 strmEOF = false;
606 isEnding = false;
608 XrdSsiRRAgent::SetMutex(this, &frqMutex);
609}
610
611/******************************************************************************/
612/* Protected: P r o c e s s R e s p o n s e */
613/******************************************************************************/
614
615// This is called via the responder with the responder and request locks held.
616
618 const XrdSsiRespInfo &Resp)
619{
620 EPNAME("ProcessResponse");
621
622// Do some debugging
623//
624 DEBUGXQ("Response presented wtr=" <<respWait);
625
626// Make sure we are still in execute state
627//
628 if (urState != isBegun && urState != isBound) return false;
629 myState = doRsp;
630 respOff = 0;
631
632// Handle the response
633//
634 switch(Resp.rType)
636 DEBUGXQ("Resp data sz="<<Resp.blen);
637 respLen = Resp.blen;
638 Stats.Bump(Stats.RspData);
639 break;
641 DEBUGXQ("Resp err rc="<<Resp.eNum<<" msg="<<Resp.eMsg);
642 respLen = 0;
643 Stats.Bump(Stats.RspErrs);
644 break;
646 DEBUGXQ("Resp file fd="<<Resp.fdnum<<" sz="<<Resp.fsize);
647 fileSz = Resp.fsize;
648 respOff = 0;
649 Stats.Bump(Stats.RspFile);
650 break;
652 DEBUGXQ("Resp strm");
653 respLen = 0;
654 Stats.Bump(Stats.RspStrm);
655 break;
656 default:
657 DEBUGXQ("Resp invalid!!!!");
658 return false;
659 Stats.Bump(Stats.RspBad);
660 break;
661 }
662
663// If the client is waiting for the response, wake up the client to get it.
664//
665 haveResp = true;
666 if (respWait) WakeUp();
667 return true;
668}
669
670/******************************************************************************/
671/* R e a d */
672/******************************************************************************/
673
675 char *buff, // Out
676 XrdSfsXferSize blen) // In
677/*
678 Function: Read `blen' bytes at `offset' into 'buff' and return the actual
679 number of bytes read.
680
681 Input: buff - Address of the buffer in which to place the data.
682 blen - The size of the buffer. This is the maximum number
683 of bytes that will be returned.
684
685 Output: Returns the number of bytes read upon success and SFS_ERROR o/w.
686*/
687{
688 static const char *epname = "read";
689 XrdSfsXferSize nbytes;
690 XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
691
692// A read should never be issued unless a response has been set
693//
694 if (myState != doRsp)
695 {done = true;
696 return (myState == odRsp ? 0 : Emsg(epname, ENOMSG, "read"));
697 }
698
699// Fan out based on the kind of response we have
700//
701 switch(Resp->rType)
703 if (respLen <= 0) {done = true; myState = odRsp; return 0;}
704 if (blen >= respLen)
705 {memcpy(buff, Resp->buff+respOff, respLen);
706 blen = respLen; myState = odRsp; done = true;
707 } else {
708 memcpy(buff, Resp->buff+respOff, blen);
709 respLen -= blen; respOff += blen;
710 }
711 return blen;
712 break;
714 cbInfo->setErrInfo(Resp->eNum, Resp->eMsg);
715 myState = odRsp; done = true;
716 return SFS_ERROR;
717 break;
719 if (fileSz <= 0) {done = true; myState = odRsp; return 0;}
720 nbytes = pread(Resp->fdnum, buff, blen, respOff);
721 if (nbytes <= 0)
722 {done = true;
723 if (!nbytes) {myState = odRsp; return 0;}
724 myState = erRsp;
725 return Emsg(epname, errno, "read");
726 }
727 respOff += nbytes; fileSz -= nbytes;
728 return nbytes;
729 break;
731 nbytes = (Resp->strmP->Type() == XrdSsiStream::isActive ?
732 readStrmA(Resp->strmP, buff, blen)
733 : readStrmP(Resp->strmP, buff, blen));
734 done = strmEOF && strBuff == 0;
735 return nbytes;
736 break;
737 default: break;
738 };
739
740// We should never get here
741//
742 myState = erRsp;
743 done = true;
744 return Emsg(epname, EFAULT, "read");
745}
746
747/******************************************************************************/
748/* Private: r e a d S t r m A */
749/******************************************************************************/
750
751XrdSfsXferSize XrdSsiFileReq::readStrmA(XrdSsiStream *strmP,
752 char *buff, XrdSfsXferSize blen)
753{
754 static const char *epname = "readStrmA";
755 XrdSsiErrInfo eObj;
756 XrdSfsXferSize xlen = 0;
757
758
759// Copy out data from the stream to fill the buffer
760//
761do{if (strBuff)
762 {if (respLen > blen)
763 {memcpy(buff, strBuff->data+respOff, blen);
764 respLen -= blen; respOff += blen;
765 return xlen+blen;
766 }
767 memcpy(buff, strBuff->data+respOff, respLen);
768 xlen += respLen;
769 strBuff->Recycle(); strBuff = 0;
770 blen -= respLen; buff += respLen;
771 }
772
773 if (!strmEOF && blen)
774 {respLen = blen; respOff = 0;
775 strBuff = strmP->GetBuff(eObj, respLen, strmEOF);
776 }
777 } while(strBuff);
778
779// Check if we have data to return
780//
781 if (strmEOF) {myState = odRsp; return xlen;}
782 else if (!blen) return xlen;
783
784// Report the error
785//
786 myState = erRsp; strmEOF = true;
787 return Emsg(epname, eObj, "read stream");
788}
789
790/******************************************************************************/
791/* Private: r e a d S t r m P */
792/******************************************************************************/
793
794XrdSfsXferSize XrdSsiFileReq::readStrmP(XrdSsiStream *strmP,
795 char *buff, XrdSfsXferSize blen)
796{
797 static const char *epname = "readStrmP";
798 XrdSsiErrInfo eObj;
799 XrdSfsXferSize xlen = 0;
800 int dlen = 0;
801
802// Copy out data from the stream to fill the buffer
803//
804 while(!strmEOF && (dlen = strmP->SetBuff(eObj, buff, blen, strmEOF)) > 0)
805 {xlen += dlen;
806 if (dlen == blen) return xlen;
807 if (dlen > blen) {eObj.Set(0,EOVERFLOW); break;}
808 buff += dlen; blen -= dlen;
809 }
810
811// Check if we ended with an zero length read
812//
813 if (strmEOF || !dlen) {myState = odRsp; strmEOF = true; return xlen;}
814
815// Return an error
816//
817 myState = erRsp; strmEOF = true;
818 return Emsg(epname, eObj, "read stream");
819}
820
821/******************************************************************************/
822/* Private: R e c y c l e */
823/******************************************************************************/
824
825void XrdSsiFileReq::Recycle()
826{
827
828// If we have an oucbuffer then we need to recycle it, otherwise if we have
829// and sfs buffer, put it on the deferred release queue.
830//
831 if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
832 else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
833 reqSize = 0;
834
835// Add to queue unless we have too many of these. If we add it back to the
836// queue; make sure it's a cleaned up object!
837//
838 aqMutex.Lock();
839 if (tident) {free(tident); tident = 0;}
840 if (freeCnt >= freeMax) {aqMutex.UnLock(); delete this;}
841 else {XrdSsiRRAgent::CleanUp(*this);
842 nextReq = freeReq;
843 freeReq = this;
844 freeCnt++;
845 aqMutex.UnLock();
846 }
847}
848
849/******************************************************************************/
850/* R e l R e q u e s t B u f f e r */
851/******************************************************************************/
852
854{
855 EPNAME("RelReqBuff");
856 XrdSsiMutexMon mHelper(frqMutex);
857
858// Do some debugging
859//
860 DEBUGXQ("called");
861 Stats.Bump(Stats.ReqRelBuf);
862
863// Release buffers
864//
865 if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
866 else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
867 reqSize = 0;
868}
869
870/******************************************************************************/
871/* S e n d */
872/******************************************************************************/
873
875{
876 static const char *epname = "send";
877 XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
878 XrdOucSFVec sfVec[2];
879 int rc;
880
881// A send should never be issued unless a response has been set. Return a
882// continuation which will cause Read() to be called to return the error.
883//
884 if (myState != doRsp) return 1;
885
886// Fan out based on the kind of response we have
887//
888 switch(Resp->rType)
890 if (blen > 0)
891 {sfVec[1].buffer = (char *)Resp->buff+respOff;
892 sfVec[1].fdnum = -1;
893 if (blen > respLen)
894 {blen = respLen; myState = odRsp;
895 } else {
896 respLen -= blen; respOff += blen;
897 }
898 } else blen = 0;
899 break;
901 return 1; // Causes error to be returned via Read()
902 break;
904 if (fileSz > 0)
905 {sfVec[1].offset = respOff; sfVec[1].fdnum = Resp->fdnum;
906 if (blen > fileSz)
907 {blen = fileSz; myState = odRsp;}
908 respOff += blen; fileSz -= blen;
909 } else blen = 0;
910 break;
912 if (Resp->strmP->Type() == XrdSsiStream::isPassive) return 1;
913 return sendStrmA(Resp->strmP, sfDio, blen);
914 break;
915 default: myState = erRsp;
916 return Emsg(epname, EFAULT, "send");
917 break;
918 };
919
920// Send off the data
921//
922 if (!blen) {sfVec[1].buffer = rID; myState = odRsp;}
923 sfVec[1].sendsz = blen;
924 rc = sfDio->SendFile(sfVec, 2);
925
926// If send succeeded, indicate the action to be taken
927//
928 if (!rc) return myState != odRsp;
929
930// The send failed, diagnose the problem
931//
932 rc = (rc < 0 ? EIO : EFAULT);
933 myState = erRsp;
934 return Emsg(epname, rc, "send");
935}
936
937/******************************************************************************/
938/* Private: s e n d S t r m A */
939/******************************************************************************/
940
941int XrdSsiFileReq::sendStrmA(XrdSsiStream *strmP,
942 XrdSfsDio *sfDio, XrdSfsXferSize blen)
943{
944 static const char *epname = "sendStrmA";
945 XrdSsiErrInfo eObj;
946 XrdOucSFVec sfVec[2];
947 int rc;
948
949// Check if we need a buffer
950//
951 if (!strBuff)
952 {respLen = blen;
953 if (strmEOF || !(strBuff = strmP->GetBuff(eObj, respLen, strmEOF)))
954 {myState = odRsp; strmEOF = true;
955 if (!strmEOF) Emsg(epname, eObj, "read stream");
956 return 1;
957 }
958 respOff = 0;
959 }
960
961// Complete the sendfile vector
962//
963 sfVec[1].buffer = strBuff->data+respOff;
964 sfVec[1].fdnum = -1;
965 if (respLen > blen)
966 {sfVec[1].sendsz = blen;
967 respLen -= blen; respOff += blen;
968 } else {
969 sfVec[1].sendsz = respLen;
970 respLen = 0;
971 }
972
973// Send off the data
974//
975 rc = sfDio->SendFile(sfVec, 2);
976
977// Release any completed buffer
978//
979 if (strBuff && !respLen) {strBuff->Recycle(); strBuff = 0;}
980
981// If send succeeded, indicate the action to be taken
982//
983 if (!rc) return myState != odRsp;
984
985// The send failed, diagnose the problem
986//
987 rc = (rc < 0 ? EIO : EFAULT);
988 myState = erRsp; strmEOF = true;
989 return Emsg(epname, rc, "send");
990}
991
992/******************************************************************************/
993/* W a n t R e s p o n s e */
994/******************************************************************************/
995
997{
998 EPNAME("WantResp");
999 XrdSsiMutexMon frqMon;
1000 const XrdSsiRespInfo *rspP;
1001
1002// Check if we have a previos alert that was sent (we need to recycle it). We
1003// don't need a lock for this as it's fully serialized via serial fsctl calls.
1004//
1005 if (alrtSent) {alrtSent->Recycle(); alrtSent = 0;}
1006
1007// Serialize the remainder of this code
1008//
1009 frqMon.Lock(frqMutex);
1010 rspP = XrdSsiRRAgent::RespP(this);
1011
1012// If we have a pending alert then we need to send it now. Suppress the callback
1013// as we will recycle the alert on the next call (there should be one).
1014//
1015 if (alrtPend)
1016 {char hexBuff[16], binBuff[8], dotBuff[4];
1017 alrtSent = alrtPend;
1018 if (!(alrtPend = alrtPend->next)) alrtLast = 0;
1019 int n = alrtSent->SetInfo(eInfo, binBuff, sizeof(binBuff));
1020 eInfo.setErrCB((XrdOucEICB *)0);
1021 DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
1022 <<(alrtPend ? "" : "no ") <<"more pending");
1023 return true;
1024 }
1025
1026// Check if a response is here (well, ProcessResponse was called)
1027//
1028// if (rspP->rType)
1029 if (haveResp)
1030 {respCBarg = 0;
1031 if (fileP->AttnInfo(eInfo, rspP, reqID)) myState = odRsp;
1032 else eInfo.setErrCB((XrdOucEICB *)0);
1033 return true;
1034 }
1035
1036// Defer this and record the callback arguments. We defer setting respWait
1037// to true until we know the deferal request has been sent (i.e. when Done()
1038// is called). This forces ProcessResponse() to not prematurely wakeup the
1039// client. This is necessitated by the fact that we must release the request
1040// lock upon return; allowing a response to come in while the deferal request
1041// is still in transit.
1042//
1043 respCB = eInfo.getErrCB(respCBarg);
1044 respWait = false;
1045 return false;
1046}
1047
1048/******************************************************************************/
1049/* Private: W a k e U p */
1050/******************************************************************************/
1051
1052void XrdSsiFileReq::WakeUp(XrdSsiAlert *aP) // Called with frqMutex locked!
1053{
1054 EPNAME("WakeUp");
1055 XrdOucErrInfo *wuInfo =
1056 new XrdOucErrInfo(tident,(XrdOucEICB *)0,respCBarg);
1057 const XrdSsiRespInfo *rspP = XrdSsiRRAgent::RespP(this);
1058 int respCode = SFS_DATAVEC;
1059
1060// Do some debugging
1061//
1062 DEBUGXQ("respCBarg=" <<Xrd::hex <<respCBarg <<Xrd::dec);
1063
1064// Setup the wakeup data. This may be for an alert or for an actual response.
1065// If this is an alert or the complete response, then make sure we get a
1066// callback to do the finalization. Otherwise, we don't need a callback
1067// and the callback handler will simply delete the error object for us.
1068//
1069 if (aP)
1070 {char hexBuff[16], binBuff[8], dotBuff[4];
1071 int n = aP->SetInfo(*wuInfo, binBuff, sizeof(binBuff));
1072 wuInfo->setErrCB((XrdOucEICB *)aP, respCBarg);
1073 DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
1074 <<(alrtPend ? "" : "no ") <<"more pending");
1075 } else {
1076 if (fileP->AttnInfo(*wuInfo, rspP, reqID))
1077 {wuInfo->setErrArg(respCBarg); myState = odRsp;}
1078 }
1079
1080// Tell the client to issue a read now or handle the alert or full response.
1081//
1082 respWait = false;
1083 respCB->Done(respCode, wuInfo, sessN);
1085}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define tident
#define EPNAME(x)
#define pread(a, b, c, d)
Definition XrdPosix.hh:80
#define eMsg(x)
#define SFS_DATAVEC
#define SFS_ERROR
int XrdSfsXferSize
class XrdBuffer * XrdSfsXioHandle
Definition XrdSfsXio.hh:46
#define DEBUGXQ(x)
#define DUMPIT(x, y)
XrdJob(const char *desc="")
Definition XrdJob.hh:51
XrdOucEICB()
Constructor and destructor.
static int Format(char *buff, int blen, int ecode, const char *etxt1, const char *etxt2=0)
XrdOucEICB * getErrCB()
void setErrArg(unsigned long long cbarg=0)
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
void Bump(int &val)
virtual int SendFile(int fildes)=0
static void Reclaim(XrdSfsXioHandle theHand)
Definition XrdSfsXio.cc:70
static char * Buffer(XrdSfsXioHandle theHand, int *buffsz=0)
Definition XrdSfsXio.cc:61
void Recycle()
static XrdSsiAlert * Alloc(XrdSsiRespInfoMsg &aMsg)
int SetInfo(XrdOucErrInfo &eInfo, char *aMsg, int aLen)
XrdSsiAlert * next
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
const std::string & Get(int &eNum) const
void Alert(XrdSsiRespInfoMsg &aMsg)
Send or receive a server generated alert.
bool WantResponse(XrdOucErrInfo &eInfo)
XrdSfsXferSize Read(bool &done, char *buffer, XrdSfsXferSize blen)
char * GetRequest(int &rLen)
void Finished(XrdSsiRequest &rqstR, const XrdSsiRespInfo &rInfo, bool cancel=false)
void RelRequestBuffer()
bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &resp)
void DeferredFinalize()
XrdSsiFileReq(const char *cID=0)
int Send(XrdSfsDio *sfDio, XrdSfsXferSize size)
static XrdSsiFileReq * Alloc(XrdOucErrInfo *eP, XrdSsiFileResource *rP, XrdSsiFileSess *fP, const char *sn, const char *id, unsigned int rnum)
void Activate(XrdOucBuffer *oP, XrdSfsXioHandle bR, int rSz)
void Done(int &Result, XrdOucErrInfo *cbInfo, const char *path=0)
void Lock(XrdSsiMutex *mutex)
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static void onServer(XrdSsiRequest *rP)
static XrdSsiRespInfo * RespP(XrdSsiRequest *rP)
static void CleanUp(XrdSsiRequest &reqR)
XrdSsiRequest(const char *reqid=0, uint16_t tmo=0)
char * GetMsg(int &mlen)
virtual void RecycleMsg(bool sent=true)=0
virtual bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen)
virtual Buffer * GetBuff(XrdSsiErrInfo &eRef, int &dlen, bool &last)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSsiStats Stats
XrdSsiService * Service
XrdScheduler * Sched
XrdSysError Log
int fdnum
File descriptor for data.
int sendsz
Length of data at offset.