summaryrefslogtreecommitdiff
path: root/lib/tp.c
blob: 7462f5bfa824cbfc7fff44a50fe76ee8fe0919a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
 * Basic workqueue like code, that sets up a thread and allows async
 * processing of some sort. Could be extended to allow for multiple
 * worker threads. But right now fio associates one of this per IO
 * thread, so should be enough to have just a single thread doing the
 * work.
 */
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>

#include "../smalloc.h"
#include "../log.h"
#include "tp.h"

static void tp_flush_work(struct flist_head *list)
{
	struct tp_work *work;

	while (!flist_empty(list)) {
		int prio;

		work = flist_entry(list->next, struct tp_work, list);
		flist_del(&work->list);

		prio = work->prio;
		if (nice(prio) < 0)
			log_err("fio: nice %s\n", strerror(errno));

		work->fn(work);

		if (nice(prio) < 0)
			log_err("fio: nice %s\n", strerror(errno));
	}
}

static void *tp_thread(void *data)
{
	struct tp_data *tdat = data;
	struct flist_head work_list;

	INIT_FLIST_HEAD(&work_list);

	while (1) {
		pthread_mutex_lock(&tdat->lock);

		if (!tdat->thread_exit && flist_empty(&tdat->work))
			pthread_cond_wait(&tdat->cv, &tdat->lock);

		if (!flist_empty(&tdat->work))
			flist_splice_tail_init(&tdat->work, &work_list);

		pthread_mutex_unlock(&tdat->lock);

		if (flist_empty(&work_list)) {
			if (tdat->thread_exit)
				break;
			continue;
		}

		tp_flush_work(&work_list);
	}

	return NULL;
}

void tp_queue_work(struct tp_data *tdat, struct tp_work *work)
{
	work->done = 0;

	pthread_mutex_lock(&tdat->lock);
	flist_add_tail(&work->list, &tdat->work);
	pthread_mutex_unlock(&tdat->lock);

	pthread_cond_signal(&tdat->cv);
}

void tp_init(struct tp_data **tdatp)
{
	struct tp_data *tdat;
	int ret;

	if (*tdatp)
		return;

	*tdatp = tdat = smalloc(sizeof(*tdat));
	pthread_mutex_init(&tdat->lock, NULL);
	INIT_FLIST_HEAD(&tdat->work);
	pthread_cond_init(&tdat->cv, NULL);
	pthread_cond_init(&tdat->sleep_cv, NULL);

	ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat);
	if (ret)
		log_err("fio: failed to create tp thread\n");
}

void tp_exit(struct tp_data **tdatp)
{
	struct tp_data *tdat = *tdatp;
	void *ret;

	if (!tdat)
		return;

	pthread_mutex_lock(&tdat->lock);
	tdat->thread_exit = 1;
	pthread_mutex_unlock(&tdat->lock);

	pthread_cond_signal(&tdat->cv);

	pthread_join(tdat->thread, &ret);

	sfree(tdat);
	*tdatp = NULL;
}