VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqqueue.cpp@ 60121

Last change on this file since 60121 was 60121, checked in by vboxsync, 9 years ago

RTReqQueueProcess: Addressed todo regarding lost requests and document behavior more accuratly.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 13.2 KB
Line 
1/* $Id: reqqueue.cpp 60121 2016-03-21 14:28:23Z vboxsync $ */
2/** @file
3 * IPRT - Request Queue.
4 */
5
6/*
7 * Copyright (C) 2006-2015 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/string.h>
37#include <iprt/time.h>
38#include <iprt/semaphore.h>
39#include <iprt/thread.h>
40#include <iprt/log.h>
41#include <iprt/mem.h>
42
43#include "internal/req.h"
44#include "internal/magics.h"
45
46
47
48RTDECL(int) RTReqQueueCreate(RTREQQUEUE *phQueue)
49{
50 PRTREQQUEUEINT pQueue = (PRTREQQUEUEINT)RTMemAllocZ(sizeof(RTREQQUEUEINT));
51 if (!pQueue)
52 return VERR_NO_MEMORY;
53 int rc = RTSemEventCreate(&pQueue->EventSem);
54 if (RT_SUCCESS(rc))
55 {
56 pQueue->u32Magic = RTREQQUEUE_MAGIC;
57
58 *phQueue = pQueue;
59 return VINF_SUCCESS;
60 }
61
62 RTMemFree(pQueue);
63 return rc;
64}
65RT_EXPORT_SYMBOL(RTReqQueueCreate);
66
67
68RTDECL(int) RTReqQueueDestroy(RTREQQUEUE hQueue)
69{
70 /*
71 * Check input.
72 */
73 if (hQueue == NIL_RTREQQUEUE)
74 return VINF_SUCCESS;
75 PRTREQQUEUEINT pQueue = hQueue;
76 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
77 AssertReturn(ASMAtomicCmpXchgU32(&pQueue->u32Magic, RTREQQUEUE_MAGIC_DEAD, RTREQQUEUE_MAGIC), VERR_INVALID_HANDLE);
78
79 RTSemEventDestroy(pQueue->EventSem);
80 pQueue->EventSem = NIL_RTSEMEVENT;
81
82 for (unsigned i = 0; i < RT_ELEMENTS(pQueue->apReqFree); i++)
83 {
84 PRTREQ pReq = (PRTREQ)ASMAtomicXchgPtr((void **)&pQueue->apReqFree[i], NULL);
85 while (pReq)
86 {
87 PRTREQ pNext = pReq->pNext;
88 rtReqFreeIt(pReq);
89 pReq = pNext;
90 }
91 }
92
93 RTMemFree(pQueue);
94 return VINF_SUCCESS;
95}
96RT_EXPORT_SYMBOL(RTReqQueueDestroy);
97
98
99RTDECL(int) RTReqQueueProcess(RTREQQUEUE hQueue, RTMSINTERVAL cMillies)
100{
101 LogFlow(("RTReqQueueProcess %x\n", hQueue));
102
103 /*
104 * Check input.
105 */
106 PRTREQQUEUEINT pQueue = hQueue;
107 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
108 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
109
110 /*
111 * Process loop. Stop (break) after the first non-VINF_SUCCESS status code.
112 */
113 int rc = VINF_SUCCESS;
114 {
115 /*
116 * Get pending requests.
117 */
118 PRTREQ pReqs = ASMAtomicXchgPtrT(&pQueue->pAlreadyPendingReqs, NULL, PRTREQ);
119 if (RT_LIKELY(!pReqs))
120 {
121 PRTREQ pReqs = ASMAtomicXchgPtrT(&pQueue->pReqs, NULL, PRTREQ);
122 if (!pReqs)
123 {
124 /* We do not adjust cMillies (documented behavior). */
125 ASMAtomicWriteBool(&pQueue->fBusy, false); /* this aint 100% perfect, but it's good enough for now... */
126 rc = RTSemEventWait(pQueue->EventSem, cMillies);
127 if (rc != VINF_SUCCESS)
128 break;
129 continue;
130 }
131
132 ASMAtomicWriteBool(&pQueue->fBusy, true);
133
134 /*
135 * Reverse the list to process it in FIFO order.
136 */
137 PRTREQ pReq = pReqs;
138 if (pReq->pNext)
139 Log2(("RTReqQueueProcess: 2+ requests: %p %p %p\n", pReq, pReq->pNext, pReq->pNext->pNext));
140 pReqs = NULL;
141 while (pReq)
142 {
143 Assert(pReq->enmState == RTREQSTATE_QUEUED);
144 Assert(pReq->uOwner.hQueue == pQueue);
145 PRTREQ pCur = pReq;
146 pReq = pReq->pNext;
147 pCur->pNext = pReqs;
148 pReqs = pCur;
149 }
150
151 }
152 else
153 ASMAtomicWriteBool(&pQueue->fBusy, true);
154
155 /*
156 * Process the requests.
157 */
158 while (pReqs)
159 {
160 /* Unchain the first request and advance the list. */
161 PRTREQ pReq = pReqs;
162 pReqs = pReqs->pNext;
163 pReq->pNext = NULL;
164
165 /* Process the request. */
166 rc = rtReqProcessOne(pReq);
167 AssertRC(rc);
168 if (rc != VINF_SUCCESS)
169 {
170 /* Propagate the return code to caller. If more requests pending, queue them for later. */
171 if (pReqs)
172 {
173 pReqs = ASMAtomicXchgPtrT(&pQueue->pAlreadyPendingReqs, pReqs, PRTREQ);
174 Assert(!pReqs);
175 }
176 break;
177 }
178 }
179 if (rc != VINF_SUCCESS)
180 break;
181 }
182
183 LogFlow(("RTReqQueueProcess: returns %Rrc\n", rc));
184 return rc;
185}
186RT_EXPORT_SYMBOL(RTReqQueueProcess);
187
188
189RTDECL(int) RTReqQueueCall(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, PFNRT pfnFunction, unsigned cArgs, ...)
190{
191 va_list va;
192 va_start(va, cArgs);
193 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, RTREQFLAGS_IPRT_STATUS, pfnFunction, cArgs, va);
194 va_end(va);
195 return rc;
196}
197RT_EXPORT_SYMBOL(RTReqQueueCall);
198
199
200RTDECL(int) RTReqQueueCallVoid(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, PFNRT pfnFunction, unsigned cArgs, ...)
201{
202 va_list va;
203 va_start(va, cArgs);
204 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, RTREQFLAGS_VOID, pfnFunction, cArgs, va);
205 va_end(va);
206 return rc;
207}
208RT_EXPORT_SYMBOL(RTReqQueueCallVoid);
209
210
211RTDECL(int) RTReqQueueCallEx(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, unsigned fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
212{
213 va_list va;
214 va_start(va, cArgs);
215 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, fFlags, pfnFunction, cArgs, va);
216 va_end(va);
217 return rc;
218}
219RT_EXPORT_SYMBOL(RTReqQueueCallEx);
220
221
222RTDECL(int) RTReqQueueCallV(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, unsigned fFlags, PFNRT pfnFunction, unsigned cArgs, va_list Args)
223{
224 LogFlow(("RTReqQueueCallV: cMillies=%d fFlags=%#x pfnFunction=%p cArgs=%d\n", cMillies, fFlags, pfnFunction, cArgs));
225
226 /*
227 * Check input.
228 */
229 PRTREQQUEUEINT pQueue = hQueue;
230 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
231 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
232 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
233 AssertReturn(!(fFlags & ~(RTREQFLAGS_RETURN_MASK | RTREQFLAGS_NO_WAIT)), VERR_INVALID_PARAMETER);
234
235 if (!(fFlags & RTREQFLAGS_NO_WAIT) || ppReq)
236 {
237 AssertPtrReturn(ppReq, VERR_INVALID_POINTER);
238 *ppReq = NULL;
239 }
240
241 PRTREQ pReq = NULL;
242 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
243
244 /*
245 * Allocate request
246 */
247 int rc = RTReqQueueAlloc(pQueue, RTREQTYPE_INTERNAL, &pReq);
248 if (rc != VINF_SUCCESS)
249 return rc;
250
251 /*
252 * Initialize the request data.
253 */
254 pReq->fFlags = fFlags;
255 pReq->u.Internal.pfn = pfnFunction;
256 pReq->u.Internal.cArgs = cArgs;
257 for (unsigned iArg = 0; iArg < cArgs; iArg++)
258 pReq->u.Internal.aArgs[iArg] = va_arg(Args, uintptr_t);
259
260 /*
261 * Queue the request and return.
262 */
263 rc = RTReqSubmit(pReq, cMillies);
264 if ( rc != VINF_SUCCESS
265 && rc != VERR_TIMEOUT)
266 {
267 RTReqRelease(pReq);
268 pReq = NULL;
269 }
270 if (!(fFlags & RTREQFLAGS_NO_WAIT))
271 {
272 *ppReq = pReq;
273 LogFlow(("RTReqQueueCallV: returns %Rrc *ppReq=%p\n", rc, pReq));
274 }
275 else
276 LogFlow(("RTReqQueueCallV: returns %Rrc\n", rc));
277 Assert(rc != VERR_INTERRUPTED);
278 return rc;
279}
280RT_EXPORT_SYMBOL(RTReqQueueCallV);
281
282
283RTDECL(bool) RTReqQueueIsBusy(RTREQQUEUE hQueue)
284{
285 PRTREQQUEUEINT pQueue = hQueue;
286 AssertPtrReturn(pQueue, false);
287
288 if (ASMAtomicReadBool(&pQueue->fBusy))
289 return true;
290 if (ASMAtomicReadPtrT(&pQueue->pReqs, PRTREQ) != NULL)
291 return true;
292 if (ASMAtomicReadBool(&pQueue->fBusy))
293 return true;
294 return false;
295}
296RT_EXPORT_SYMBOL(RTReqQueueIsBusy);
297
298
299/**
300 * Joins the list pList with whatever is linked up at *pHead.
301 */
302static void vmr3ReqJoinFreeSub(volatile PRTREQ *ppHead, PRTREQ pList)
303{
304 for (unsigned cIterations = 0;; cIterations++)
305 {
306 PRTREQ pHead = ASMAtomicXchgPtrT(ppHead, pList, PRTREQ);
307 if (!pHead)
308 return;
309 PRTREQ pTail = pHead;
310 while (pTail->pNext)
311 pTail = pTail->pNext;
312 pTail->pNext = pList;
313 if (ASMAtomicCmpXchgPtr(ppHead, pHead, pList))
314 return;
315 pTail->pNext = NULL;
316 if (ASMAtomicCmpXchgPtr(ppHead, pHead, NULL))
317 return;
318 pList = pHead;
319 Assert(cIterations != 32);
320 Assert(cIterations != 64);
321 }
322}
323
324
325/**
326 * Joins the list pList with whatever is linked up at *pHead.
327 */
328static void vmr3ReqJoinFree(PRTREQQUEUEINT pQueue, PRTREQ pList)
329{
330 /*
331 * Split the list if it's too long.
332 */
333 unsigned cReqs = 1;
334 PRTREQ pTail = pList;
335 while (pTail->pNext)
336 {
337 if (cReqs++ > 25)
338 {
339 const uint32_t i = pQueue->iReqFree;
340 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(i + 2) % RT_ELEMENTS(pQueue->apReqFree)], pTail->pNext);
341
342 pTail->pNext = NULL;
343 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(i + 2 + (i == pQueue->iReqFree)) % RT_ELEMENTS(pQueue->apReqFree)], pTail->pNext);
344 return;
345 }
346 pTail = pTail->pNext;
347 }
348 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(pQueue->iReqFree + 2) % RT_ELEMENTS(pQueue->apReqFree)], pList);
349}
350
351
352RTDECL(int) RTReqQueueAlloc(RTREQQUEUE hQueue, RTREQTYPE enmType, PRTREQ *phReq)
353{
354 /*
355 * Validate input.
356 */
357 PRTREQQUEUEINT pQueue = hQueue;
358 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
359 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
360 AssertMsgReturn(enmType > RTREQTYPE_INVALID && enmType < RTREQTYPE_MAX, ("%d\n", enmType), VERR_RT_REQUEST_INVALID_TYPE);
361
362 /*
363 * Try get a recycled packet.
364 *
365 * While this could all be solved with a single list with a lock, it's a sport
366 * of mine to avoid locks.
367 */
368 int cTries = RT_ELEMENTS(pQueue->apReqFree) * 2;
369 while (--cTries >= 0)
370 {
371 PRTREQ volatile *ppHead = &pQueue->apReqFree[ASMAtomicIncU32(&pQueue->iReqFree) % RT_ELEMENTS(pQueue->apReqFree)];
372 PRTREQ pReq = ASMAtomicXchgPtrT(ppHead, NULL, PRTREQ);
373 if (pReq)
374 {
375 PRTREQ pNext = pReq->pNext;
376 if ( pNext
377 && !ASMAtomicCmpXchgPtr(ppHead, pNext, NULL))
378 vmr3ReqJoinFree(pQueue, pReq->pNext);
379 ASMAtomicDecU32(&pQueue->cReqFree);
380
381 Assert(pReq->uOwner.hQueue == pQueue);
382 Assert(!pReq->fPoolOrQueue);
383
384 int rc = rtReqReInit(pReq, enmType);
385 if (RT_SUCCESS(rc))
386 {
387 *phReq = pReq;
388 LogFlow(("RTReqQueueAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
389 return VINF_SUCCESS;
390 }
391 }
392 }
393
394 /*
395 * Ok, allocate a new one.
396 */
397 int rc = rtReqAlloc(enmType, false /*fPoolOrQueue*/, pQueue, phReq);
398 LogFlow(("RTReqQueueAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
399 return rc;
400}
401RT_EXPORT_SYMBOL(RTReqQueueAlloc);
402
403
404/**
405 * Recycles a requst.
406 *
407 * @returns true if recycled, false if it should be freed.
408 * @param pQueue The queue.
409 * @param pReq The request.
410 */
411DECLHIDDEN(bool) rtReqQueueRecycle(PRTREQQUEUEINT pQueue, PRTREQINT pReq)
412{
413 if ( !pQueue
414 || pQueue->cReqFree >= 128)
415 return false;
416
417 ASMAtomicIncU32(&pQueue->cReqFree);
418 PRTREQ volatile *ppHead = &pQueue->apReqFree[ASMAtomicIncU32(&pQueue->iReqFree) % RT_ELEMENTS(pQueue->apReqFree)];
419 PRTREQ pNext;
420 do
421 {
422 pNext = *ppHead;
423 ASMAtomicWritePtr(&pReq->pNext, pNext);
424 } while (!ASMAtomicCmpXchgPtr(ppHead, pReq, pNext));
425
426 return true;
427}
428
429
430/**
431 * Submits a request to the queue.
432 *
433 * @param pQueue The queue.
434 * @param pReq The request.
435 */
436DECLHIDDEN(void) rtReqQueueSubmit(PRTREQQUEUEINT pQueue, PRTREQINT pReq)
437{
438 PRTREQ pNext;
439 do
440 {
441 pNext = pQueue->pReqs;
442 pReq->pNext = pNext;
443 ASMAtomicWriteBool(&pQueue->fBusy, true);
444 } while (!ASMAtomicCmpXchgPtr(&pQueue->pReqs, pReq, pNext));
445
446 /*
447 * Notify queue thread.
448 */
449 RTSemEventSignal(pQueue->EventSem);
450}
451
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette