52
52
#define IO_WORKER_WAKEUP_FANOUT 2
53
53
54
54
55
- typedef struct AioWorkerSubmissionQueue
55
+ typedef struct PgAioWorkerSubmissionQueue
56
56
{
57
57
uint32 size ;
58
58
uint32 mask ;
59
59
uint32 head ;
60
60
uint32 tail ;
61
- uint32 ios [FLEXIBLE_ARRAY_MEMBER ];
62
- } AioWorkerSubmissionQueue ;
61
+ uint32 sqes [FLEXIBLE_ARRAY_MEMBER ];
62
+ } PgAioWorkerSubmissionQueue ;
63
63
64
- typedef struct AioWorkerSlot
64
+ typedef struct PgAioWorkerSlot
65
65
{
66
66
Latch * latch ;
67
67
bool in_use ;
68
- } AioWorkerSlot ;
68
+ } PgAioWorkerSlot ;
69
69
70
- typedef struct AioWorkerControl
70
+ typedef struct PgAioWorkerControl
71
71
{
72
72
uint64 idle_worker_mask ;
73
- AioWorkerSlot workers [FLEXIBLE_ARRAY_MEMBER ];
74
- } AioWorkerControl ;
73
+ PgAioWorkerSlot workers [FLEXIBLE_ARRAY_MEMBER ];
74
+ } PgAioWorkerControl ;
75
75
76
76
77
77
static size_t pgaio_worker_shmem_size (void );
@@ -96,8 +96,8 @@ int io_workers = 3;
96
96
97
97
static int io_worker_queue_size = 64 ;
98
98
static int MyIoWorkerId ;
99
- static AioWorkerSubmissionQueue * io_worker_submission_queue ;
100
- static AioWorkerControl * io_worker_control ;
99
+ static PgAioWorkerSubmissionQueue * io_worker_submission_queue ;
100
+ static PgAioWorkerControl * io_worker_control ;
101
101
102
102
103
103
static size_t
@@ -106,15 +106,15 @@ pgaio_worker_queue_shmem_size(int *queue_size)
106
106
/* Round size up to next power of two so we can make a mask. */
107
107
* queue_size = pg_nextpower2_32 (io_worker_queue_size );
108
108
109
- return offsetof(AioWorkerSubmissionQueue , ios ) +
109
+ return offsetof(PgAioWorkerSubmissionQueue , sqes ) +
110
110
sizeof (uint32 ) * * queue_size ;
111
111
}
112
112
113
113
static size_t
114
114
pgaio_worker_control_shmem_size (void )
115
115
{
116
- return offsetof(AioWorkerControl , workers ) +
117
- sizeof (AioWorkerSlot ) * MAX_IO_WORKERS ;
116
+ return offsetof(PgAioWorkerControl , workers ) +
117
+ sizeof (PgAioWorkerSlot ) * MAX_IO_WORKERS ;
118
118
}
119
119
120
120
static size_t
@@ -162,7 +162,7 @@ pgaio_worker_shmem_init(bool first_time)
162
162
}
163
163
164
164
static int
165
- pgaio_choose_idle_worker (void )
165
+ pgaio_worker_choose_idle (void )
166
166
{
167
167
int worker ;
168
168
@@ -180,7 +180,7 @@ pgaio_choose_idle_worker(void)
180
180
static bool
181
181
pgaio_worker_submission_queue_insert (PgAioHandle * ioh )
182
182
{
183
- AioWorkerSubmissionQueue * queue ;
183
+ PgAioWorkerSubmissionQueue * queue ;
184
184
uint32 new_head ;
185
185
186
186
queue = io_worker_submission_queue ;
@@ -192,7 +192,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
192
192
return false; /* full */
193
193
}
194
194
195
- queue -> ios [queue -> head ] = pgaio_io_get_id (ioh );
195
+ queue -> sqes [queue -> head ] = pgaio_io_get_id (ioh );
196
196
queue -> head = new_head ;
197
197
198
198
return true;
@@ -201,14 +201,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
201
201
static uint32
202
202
pgaio_worker_submission_queue_consume (void )
203
203
{
204
- AioWorkerSubmissionQueue * queue ;
204
+ PgAioWorkerSubmissionQueue * queue ;
205
205
uint32 result ;
206
206
207
207
queue = io_worker_submission_queue ;
208
208
if (queue -> tail == queue -> head )
209
209
return UINT32_MAX ; /* empty */
210
210
211
- result = queue -> ios [queue -> tail ];
211
+ result = queue -> sqes [queue -> tail ];
212
212
queue -> tail = (queue -> tail + 1 ) & (queue -> size - 1 );
213
213
214
214
return result ;
@@ -241,37 +241,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
241
241
}
242
242
243
243
static void
244
- pgaio_worker_submit_internal (int nios , PgAioHandle * ios [] )
244
+ pgaio_worker_submit_internal (int num_staged_ios , PgAioHandle * * staged_ios )
245
245
{
246
246
PgAioHandle * synchronous_ios [PGAIO_SUBMIT_BATCH_SIZE ];
247
247
int nsync = 0 ;
248
248
Latch * wakeup = NULL ;
249
249
int worker ;
250
250
251
- Assert (nios <= PGAIO_SUBMIT_BATCH_SIZE );
251
+ Assert (num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE );
252
252
253
253
LWLockAcquire (AioWorkerSubmissionQueueLock , LW_EXCLUSIVE );
254
- for (int i = 0 ; i < nios ; ++ i )
254
+ for (int i = 0 ; i < num_staged_ios ; ++ i )
255
255
{
256
- Assert (!pgaio_worker_needs_synchronous_execution (ios [i ]));
257
- if (!pgaio_worker_submission_queue_insert (ios [i ]))
256
+ Assert (!pgaio_worker_needs_synchronous_execution (staged_ios [i ]));
257
+ if (!pgaio_worker_submission_queue_insert (staged_ios [i ]))
258
258
{
259
259
/*
260
260
* We'll do it synchronously, but only after we've sent as many as
261
261
* we can to workers, to maximize concurrency.
262
262
*/
263
- synchronous_ios [nsync ++ ] = ios [i ];
263
+ synchronous_ios [nsync ++ ] = staged_ios [i ];
264
264
continue ;
265
265
}
266
266
267
267
if (wakeup == NULL )
268
268
{
269
269
/* Choose an idle worker to wake up if we haven't already. */
270
- worker = pgaio_choose_idle_worker ();
270
+ worker = pgaio_worker_choose_idle ();
271
271
if (worker >= 0 )
272
272
wakeup = io_worker_control -> workers [worker ].latch ;
273
273
274
- pgaio_debug_io (DEBUG4 , ios [i ],
274
+ pgaio_debug_io (DEBUG4 , staged_ios [i ],
275
275
"choosing worker %d" ,
276
276
worker );
277
277
}
@@ -490,7 +490,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
490
490
IO_WORKER_WAKEUP_FANOUT );
491
491
for (int i = 0 ; i < nwakeups ; ++ i )
492
492
{
493
- if ((worker = pgaio_choose_idle_worker ()) < 0 )
493
+ if ((worker = pgaio_worker_choose_idle ()) < 0 )
494
494
break ;
495
495
latches [nlatches ++ ] = io_worker_control -> workers [worker ].latch ;
496
496
}
0 commit comments