aboutsummaryrefslogtreecommitdiff
path: root/src/worker.h
blob: 339820802b3432e3d4156265868c3c31f4c93213 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
// Copyright 2006 Google Inc. All Rights Reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

//      http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// worker.h : worker thread interface

// This file contains the Worker Thread class interface
// for the SAT test. Worker Threads implement a repetative
// task used to test or stress the system.

#ifndef STRESSAPPTEST_WORKER_H_
#define STRESSAPPTEST_WORKER_H_

#include <pthread.h>

#include <sys/time.h>
#include <sys/types.h>

#ifdef HAVE_LIBAIO_H
#include <libaio.h>
#endif

#include <queue>
#include <set>
#include <string>
#include <vector>

// This file must work with autoconf on its public version,
// so these includes are correct.
#include "disk_blocks.h"
#include "queue.h"
#include "sattypes.h"


// Global Datastruture shared by the Cache Coherency Worker Threads.
struct cc_cacheline_data {
  char *num;
};

// Typical usage:
// (Other workflows may be possible, see function comments for details.)
// - Control thread creates object.
// - Control thread calls AddWorkers(1) for each worker thread.
// - Control thread calls Initialize().
// - Control thread launches worker threads.
// - Every worker thread frequently calls ContinueRunning().
// - Control thread periodically calls PauseWorkers(), effectively sleeps, and
//     then calls ResumeWorkers().
// - Some worker threads may exit early, before StopWorkers() is called.  They
//     call RemoveSelf() after their last call to ContinueRunning().
// - Control thread eventually calls StopWorkers().
// - Worker threads exit.
// - Control thread joins worker threads.
// - Control thread calls Destroy().
// - Control thread destroys object.
//
// Threadsafety:
// - ContinueRunning() may be called concurrently by different workers, but not
//     by a single worker.
// - No other methods may ever be called concurrently, with themselves or
//     eachother.
// - This object may be used by multiple threads only between Initialize() and
//     Destroy().
//
// TODO(matthewb): Move this class and its unittest to their own files.
class WorkerStatus {
 public:
  //--------------------------------
  // Methods for the control thread.
  //--------------------------------

  WorkerStatus() : num_workers_(0), status_(RUN) {}

  // Called by the control thread to increase the worker count.  Must be called
  // before Initialize().  The worker count is 0 upon object initialization.
  void AddWorkers(int num_new_workers) {
    // No need to lock num_workers_mutex_ because this is before Initialize().
    num_workers_ += num_new_workers;
  }

  // Called by the control thread.  May not be called multiple times.  If
  // called, Destroy() must be called before destruction.
  void Initialize();

  // Called by the control thread after joining all worker threads.  Must be
  // called iff Initialize() was called.  No methods may be called after calling
  // this.
  void Destroy();

  // Called by the control thread to tell the workers to pause.  Does not return
  // until all workers have called ContinueRunning() or RemoveSelf().  May only
  // be called between Initialize() and Stop().  Must not be called multiple
  // times without ResumeWorkers() having been called inbetween.
  void PauseWorkers();

  // Called by the control thread to tell the workers to resume from a pause.
  // May only be called between Initialize() and Stop().  May only be called
  // directly after PauseWorkers().
  void ResumeWorkers();

  // Called by the control thread to tell the workers to stop.  May only be
  // called between Initialize() and Destroy().  May only be called once.
  void StopWorkers();

  //--------------------------------
  // Methods for the worker threads.
  //--------------------------------

  // Called by worker threads to decrease the worker count by one.  May only be
  // called between Initialize() and Destroy().  May wait for ResumeWorkers()
  // when called after PauseWorkers().
  void RemoveSelf();

  // Called by worker threads between Initialize() and Destroy().  May be called
  // any number of times.  Return value is whether or not the worker should
  // continue running.  When called after PauseWorkers(), does not return until
  // ResumeWorkers() or StopWorkers() has been called.  Number of distinct
  // calling threads must match the worker count (see AddWorkers() and
  // RemoveSelf()).
  bool ContinueRunning(bool *paused);

  // This is a hack!  It's like ContinueRunning(), except it won't pause.  If
  // any worker threads use this exclusively in place of ContinueRunning() then
  // PauseWorkers() should never be used!
  bool ContinueRunningNoPause();

 private:
  enum Status { RUN, PAUSE, STOP };

  void WaitOnPauseBarrier() {
#ifdef HAVE_PTHREAD_BARRIERS
    int error = pthread_barrier_wait(&pause_barrier_);
    if (error != PTHREAD_BARRIER_SERIAL_THREAD)
      sat_assert(error == 0);
#endif
  }

  void AcquireNumWorkersLock() {
    sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
  }

  void ReleaseNumWorkersLock() {
    sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
  }

  void AcquireStatusReadLock() {
    sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
  }

  void AcquireStatusWriteLock() {
    sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
  }

  void ReleaseStatusLock() {
    sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
  }

  Status GetStatus() {
    AcquireStatusReadLock();
    Status status = status_;
    ReleaseStatusLock();
    return status;
  }

  // Returns the previous status.
  Status SetStatus(Status status) {
    AcquireStatusWriteLock();
    Status prev_status = status_;
    status_ = status;
    ReleaseStatusLock();
    return prev_status;
  }

  pthread_mutex_t num_workers_mutex_;
  int num_workers_;

  pthread_rwlock_t status_rwlock_;
  Status status_;

#ifdef HAVE_PTHREAD_BARRIERS
  // Guaranteed to not be in use when (status_ != PAUSE).
  pthread_barrier_t pause_barrier_;
#endif

  DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
};


// This is a base class for worker threads.
// Each thread repeats a specific
// task on various blocks of memory.
class WorkerThread {
 public:
  // Enum to mark a thread as low/med/high priority.
  enum Priority {
    Low,
    Normal,
    High,
  };
  WorkerThread();
  virtual ~WorkerThread();

  // Initialize values and thread ID number.
  virtual void InitThread(int thread_num_init,
                          class Sat *sat_init,
                          class OsLayer *os_init,
                          class PatternList *patternlist_init,
                          WorkerStatus *worker_status);

  // This function is DEPRECATED, it does nothing.
  void SetPriority(Priority priority) { priority_ = priority; }
  // Spawn the worker thread, by running Work().
  int SpawnThread();
  // Only for ThreadSpawnerGeneric().
  void StartRoutine();
  bool InitPriority();

  // Wait for the thread to complete its cleanup.
  virtual bool JoinThread();
  // Kill worker thread with SIGINT.
  virtual bool KillThread();

  // This is the task function that the thread executes.
  // This is implemented per subclass.
  virtual bool Work();

  // Starts per-WorkerThread timer.
  void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
  // Reads current timer value and returns run duration without recording it.
  int64 ReadThreadTimer() {
    struct timeval end_time_;
    gettimeofday(&end_time_, NULL);
    return (end_time_.tv_sec - start_time_.tv_sec)*1000000ULL +
      (end_time_.tv_usec - start_time_.tv_usec);
  }
  // Stops per-WorkerThread timer and records thread run duration.
  // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
  // is effectively paused and restarted, so runduration_usec accumulates on.
  void StopThreadTimer() {
    runduration_usec_ += ReadThreadTimer();
  }

  // Acccess member variables.
  bool GetStatus() {return status_;}
  int64 GetErrorCount() {return errorcount_;}
  int64 GetPageCount() {return pages_copied_;}
  int64 GetRunDurationUSec() {return runduration_usec_;}

  // Returns bandwidth defined as pages_copied / thread_run_durations.
  virtual float GetCopiedData();
  // Calculate worker thread specific copied data.
  virtual float GetMemoryCopiedData() {return 0;}
  virtual float GetDeviceCopiedData() {return 0;}
  // Calculate worker thread specific bandwidth.
  virtual float GetMemoryBandwidth()
    {return GetMemoryCopiedData() / (
        runduration_usec_ * 1.0 / 1000000.);}
  virtual float GetDeviceBandwidth()
    {return GetDeviceCopiedData() / (
        runduration_usec_ * 1.0 / 1000000.);}

  void set_cpu_mask(cpu_set_t *mask) {
    memcpy(&cpu_mask_, mask, sizeof(*mask));
  }

  void set_cpu_mask_to_cpu(int cpu_num) {
    cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
  }

  void set_tag(int32 tag) {tag_ = tag;}

  // Returns CPU mask, where each bit represents a logical cpu.
  bool AvailableCpus(cpu_set_t *cpuset);
  // Returns CPU mask of CPUs this thread is bound to,
  bool CurrentCpus(cpu_set_t *cpuset);
  // Returns Current Cpus mask as string.
  string CurrentCpusFormat() {
    cpu_set_t current_cpus;
    CurrentCpus(&current_cpus);
    return cpuset_format(&current_cpus);
  }

  int ThreadID() {return thread_num_;}

  // Bind worker thread to specified CPU(s)
  bool BindToCpus(const cpu_set_t *cpuset);

 protected:
  // This function dictates whether the main work loop
  // continues, waits, or terminates.
  // All work loops should be of the form:
  //   do {
  //     // work.
  //   } while (IsReadyToRun());
  virtual bool IsReadyToRun(bool *paused = NULL) {
    return worker_status_->ContinueRunning(paused);
  }

  // Like IsReadyToRun(), except it won't pause.
  virtual bool IsReadyToRunNoPause() {
    return worker_status_->ContinueRunningNoPause();
  }

  // These are functions used by the various work loops.
  // Pretty print and log a data miscompare.
  virtual void ProcessError(struct ErrorRecord *er,
                            int priority,
                            const char *message);

  // Compare a region of memory with a known data patter, and report errors.
  virtual int CheckRegion(void *addr,
                          class Pattern *pat,
                          uint32 lastcpu,
                          int64 length,
                          int offset,
                          int64 patternoffset);

  // Fast compare a block of memory.
  virtual int CrcCheckPage(struct page_entry *srcpe);

  // Fast copy a block of memory, while verifying correctness.
  virtual int CrcCopyPage(struct page_entry *dstpe,
                          struct page_entry *srcpe);

  // Fast copy a block of memory, while verifying correctness, and heating CPU.
  virtual int CrcWarmCopyPage(struct page_entry *dstpe,
                              struct page_entry *srcpe);

  // Fill a page with its specified pattern.
  virtual bool FillPage(struct page_entry *pe);

  // Copy with address tagging.
  virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
                                uint64 *srcmem64,
                                unsigned int size_in_bytes,
                                AdlerChecksum *checksum,
                                struct page_entry *pe);
  // SSE copy with address tagging.
  virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
                                   uint64 *srcmem64,
                                   unsigned int size_in_bytes,
                                   AdlerChecksum *checksum,
                                   struct page_entry *pe);
  // Crc data with address tagging.
  virtual bool AdlerAddrCrcC(uint64 *srcmem64,
                             unsigned int size_in_bytes,
                             AdlerChecksum *checksum,
                             struct page_entry *pe);
  // Setup tagging on an existing page.
  virtual bool TagAddrC(uint64 *memwords,
                        unsigned int size_in_bytes);
  // Report a mistagged cacheline.
  virtual bool ReportTagError(uint64 *mem64,
                      uint64 actual,
                      uint64 tag);
  // Print out the error record of the tag mismatch.
  virtual void ProcessTagError(struct ErrorRecord *error,
                       int priority,
                       const char *message);

  // A worker thread can yield itself to give up CPU until it's scheduled again
  bool YieldSelf();

 protected:
  // General state variables that all subclasses need.
  int thread_num_;                  // Thread ID.
  volatile bool status_;            // Error status.
  volatile int64 pages_copied_;     // Recorded for memory bandwidth calc.
  volatile int64 errorcount_;       // Miscompares seen by this thread.

  cpu_set_t cpu_mask_;              // Cores this thread is allowed to run on.
  volatile uint32 tag_;             // Tag hint for memory this thread can use.

  bool tag_mode_;                   // Tag cachelines with vaddr.

  // Thread timing variables.
  struct timeval start_time_;        // Worker thread start time.
  volatile int64 runduration_usec_;  // Worker run duration in u-seconds.

  // Function passed to pthread_create.
  void *(*thread_spawner_)(void *args);
  pthread_t thread_;                // Pthread thread ID.
  Priority priority_;               // Worker thread priority.
  class Sat *sat_;                  // Reference to parent stest object.
  class OsLayer *os_;               // Os abstraction: put hacks here.
  class PatternList *patternlist_;  // Reference to data patterns.

  // Work around style guide ban on sizeof(int).
  static const uint64 iamint_ = 0;
  static const int wordsize_ = sizeof(iamint_);

 private:
  WorkerStatus *worker_status_;

  DISALLOW_COPY_AND_ASSIGN(WorkerThread);
};

// Worker thread to perform File IO.
class FileThread : public WorkerThread {
 public:
  FileThread();
  // Set filename to use for file IO.
  virtual void SetFile(const char *filename_init);
  virtual bool Work();

  // Calculate worker thread specific bandwidth.
  virtual float GetDeviceCopiedData()
    {return GetCopiedData()*2;}
  virtual float GetMemoryCopiedData();

 protected:
  // Record of where these pages were sourced from, and what
  // potentially broken components they passed through.
  struct PageRec {
     class Pattern *pattern;  // This is the data it should contain.
     void *src;  // This is the memory location the data was sourced from.
     void *dst;  // This is where it ended up.
  };

  // These are functions used by the various work loops.
  // Pretty print and log a data miscompare. Disks require
  // slightly different error handling.
  virtual void ProcessError(struct ErrorRecord *er,
                            int priority,
                            const char *message);

  virtual bool OpenFile(int *pfile);
  virtual bool CloseFile(int fd);

  // Read and write whole file to disk.
  virtual bool WritePages(int fd);
  virtual bool ReadPages(int fd);

  // Read and write pages to disk.
  virtual bool WritePageToFile(int fd, struct page_entry *src);
  virtual bool ReadPageFromFile(int fd, struct page_entry *dst);

  // Sector tagging support.
  virtual bool SectorTagPage(struct page_entry *src, int block);
  virtual bool SectorValidatePage(const struct PageRec &page,
                                  struct page_entry *dst,
                                  int block);

  // Get memory for an incoming data transfer..
  virtual bool PagePrepare();
  // Remove memory allocated for data transfer.
  virtual bool PageTeardown();

  // Get memory for an incoming data transfer..
  virtual bool GetEmptyPage(struct page_entry *dst);
  // Get memory for an outgoing data transfer..
  virtual bool GetValidPage(struct page_entry *dst);
  // Throw out a used empty page.
  virtual bool PutEmptyPage(struct page_entry *src);
  // Throw out a used, filled page.
  virtual bool PutValidPage(struct page_entry *src);


  struct PageRec *page_recs_;          // Array of page records.
  int crc_page_;                        // Page currently being CRC checked.
  string filename_;                     // Name of file to access.
  string devicename_;                   // Name of device file is on.

  bool page_io_;                        // Use page pool for IO.
  void *local_page_;                   // malloc'd page fon non-pool IO.
  int pass_;                            // Number of writes to the file so far.

  // Tag to detect file corruption.
  struct SectorTag {
    volatile uint8 magic;
    volatile uint8 block;
    volatile uint8 sector;
    volatile uint8 pass;
    char pad[512-4];
  };

  DISALLOW_COPY_AND_ASSIGN(FileThread);
};


// Worker thread to perform Network IO.
class NetworkThread : public WorkerThread {
 public:
  NetworkThread();
  // Set hostname to use for net IO.
  virtual void SetIP(const char *ipaddr_init);
  virtual bool Work();

  // Calculate worker thread specific bandwidth.
  virtual float GetDeviceCopiedData()
    {return GetCopiedData()*2;}

 protected:
  // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
  virtual bool IsNetworkStopSet();
  virtual bool CreateSocket(int *psocket);
  virtual bool CloseSocket(int sock);
  virtual bool Connect(int sock);
  virtual bool SendPage(int sock, struct page_entry *src);
  virtual bool ReceivePage(int sock, struct page_entry *dst);
  char ipaddr_[256];
  int sock_;

 private:
  DISALLOW_COPY_AND_ASSIGN(NetworkThread);
};

// Worker thread to reflect Network IO.
class NetworkSlaveThread : public NetworkThread {
 public:
  NetworkSlaveThread();
  // Set socket for IO.
  virtual void SetSock(int sock);
  virtual bool Work();

 protected:
  virtual bool IsNetworkStopSet();

 private:
  DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
};

// Worker thread to detect incoming Network IO.
class NetworkListenThread : public NetworkThread {
 public:
  NetworkListenThread();
  virtual bool Work();

 private:
  virtual bool Listen();
  virtual bool Wait();
  virtual bool GetConnection(int *pnewsock);
  virtual bool SpawnSlave(int newsock, int threadid);
  virtual bool ReapSlaves();

  // For serviced incoming connections.
  struct ChildWorker {
    WorkerStatus status;
    NetworkSlaveThread thread;
  };
  typedef vector<ChildWorker*> ChildVector;
  ChildVector child_workers_;

  DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
};

// Worker thread to perform Memory Copy.
class CopyThread : public WorkerThread {
 public:
  CopyThread() {}
  virtual bool Work();
  // Calculate worker thread specific bandwidth.
  virtual float GetMemoryCopiedData()
    {return GetCopiedData()*2;}

 private:
  DISALLOW_COPY_AND_ASSIGN(CopyThread);
};

// Worker thread to perform Memory Invert.
class InvertThread : public WorkerThread {
 public:
  InvertThread() {}
  virtual bool Work();
  // Calculate worker thread specific bandwidth.
  virtual float GetMemoryCopiedData()
    {return GetCopiedData()*4;}

 private:
  virtual int InvertPageUp(struct page_entry *srcpe);
  virtual int InvertPageDown(struct page_entry *srcpe);
  DISALLOW_COPY_AND_ASSIGN(InvertThread);
};

// Worker thread to fill blank pages on startup.
class FillThread : public WorkerThread {
 public:
  FillThread();
  // Set how many pages this thread should fill before exiting.
  virtual void SetFillPages(int64 num_pages_to_fill_init);
  virtual bool Work();

 private:
  // Fill a page with the data pattern in pe->pattern.
  virtual bool FillPageRandom(struct page_entry *pe);
  int64 num_pages_to_fill_;
  DISALLOW_COPY_AND_ASSIGN(FillThread);
};

// Worker thread to verify page data matches pattern data.
// Thread will check and replace pages until "done" flag is set,
// then it will check and discard pages until no more remain.
class CheckThread : public WorkerThread {
 public:
  CheckThread() {}
  virtual bool Work();
  // Calculate worker thread specific bandwidth.
  virtual float GetMemoryCopiedData()
    {return GetCopiedData();}

 private:
  DISALLOW_COPY_AND_ASSIGN(CheckThread);
};


// Worker thread to poll for system error messages.
// Thread will check for messages until "done" flag is set.
class ErrorPollThread : public WorkerThread {
 public:
  ErrorPollThread() {}
  virtual bool Work();

 private:
  DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
};

// Computation intensive worker thread to stress CPU.
class CpuStressThread : public WorkerThread {
 public:
  CpuStressThread() {}
  virtual bool Work();

 private:
  DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
};

// Worker thread that tests the correctness of the
// CPU Cache Coherency Protocol.
class CpuCacheCoherencyThread : public WorkerThread {
 public:
  CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
                          int cc_cacheline_count_,
                          int cc_thread_num_,
                          int cc_thread_count_,
                          int cc_inc_count_);
  virtual bool Work();

 protected:
  // Used by the simple random number generator as a shift feedback;
  // this polynomial (x^64 + x^63 + x^61 + x^60 + 1) will produce a
  // psuedorandom cycle of period 2^64-1.
  static const uint64 kRandomPolynomial = 0xD800000000000000ULL;
  // A very simple psuedorandom generator that can be inlined and use
  // registers, to keep the CC test loop tight and focused.
  static uint64 SimpleRandom(uint64 seed);

  cc_cacheline_data *cc_cacheline_data_;  // Datstructure for each cacheline.
  int cc_local_num_;        // Local counter for each thread.
  int cc_cacheline_count_;  // Number of cache lines to operate on.
  int cc_thread_num_;       // The integer id of the thread which is
                            // used as an index into the integer array
                            // of the cacheline datastructure.
  int cc_thread_count_;     // Total number of threads being run, for
                            // calculations mixing up cache line access.
  int cc_inc_count_;        // Number of times to increment the counter.

 private:
  DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
};

// Worker thread to perform disk test.
class DiskThread : public WorkerThread {
 public:
  explicit DiskThread(DiskBlockTable *block_table);
  virtual ~DiskThread();
  // Calculate disk thread specific bandwidth.
  virtual float GetDeviceCopiedData() {
    return (blocks_written_ * write_block_size_ +
            blocks_read_ * read_block_size_) / kMegabyte;}

  // Set filename for device file (in /dev).
  virtual void SetDevice(const char *device_name);
  // Set various parameters that control the behaviour of the test.
  virtual bool SetParameters(int read_block_size,
                             int write_block_size,
                             int64 segment_size,
                             int64 cache_size,
                             int blocks_per_segment,
                             int64 read_threshold,
                             int64 write_threshold,
                             int non_destructive);

  virtual bool Work();

  virtual float GetMemoryCopiedData() {return 0;}

 protected:
  static const int kSectorSize = 512;       // Size of sector on disk.
  static const int kBufferAlignment = 512;  // Buffer alignment required by the
                                            // kernel.
  static const int kBlockRetry = 100;       // Number of retries to allocate
                                            // sectors.

  enum IoOp {
    ASYNC_IO_READ   = 0,
    ASYNC_IO_WRITE  = 1
  };

  virtual bool OpenDevice(int *pfile);
  virtual bool CloseDevice(int fd);

  // Retrieves the size (in bytes) of the disk/file.
  virtual bool GetDiskSize(int fd);

  // Retrieves the current time in microseconds.
  virtual int64 GetTime();

  // Do an asynchronous disk I/O operation.
  virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
                           int64 offset, int64 timeout);

  // Write a block to disk.
  virtual bool WriteBlockToDisk(int fd, BlockData *block);

  // Verify a block on disk.
  virtual bool ValidateBlockOnDisk(int fd, BlockData *block);

  // Main work loop.
  virtual bool DoWork(int fd);

  int read_block_size_;       // Size of blocks read from disk, in bytes.
  int write_block_size_;      // Size of blocks written to disk, in bytes.
  int64 blocks_read_;         // Number of blocks read in work loop.
  int64 blocks_written_;      // Number of blocks written in work loop.
  int64 segment_size_;        // Size of disk segments (in bytes) that the disk
                              // will be split into where testing can be
                              // confined to a particular segment.
                              // Allows for control of how evenly the disk will
                              // be tested.  Smaller segments imply more even
                              // testing (less random).
  int blocks_per_segment_;    // Number of blocks that will be tested per
                              // segment.
  int cache_size_;            // Size of disk cache, in bytes.
  int queue_size_;            // Length of in-flight-blocks queue, in blocks.
  int non_destructive_;       // Use non-destructive mode or not.
  int update_block_table_;    // If true, assume this is the thread
                              // responsible for writing the data in the disk
                              // for this block device and, therefore,
                              // update the block table. If false, just use
                              // the block table to get data.

  // read/write times threshold for reporting a problem
  int64 read_threshold_;      // Maximum time a read should take (in us) before
                              // a warning is given.
  int64 write_threshold_;     // Maximum time a write should take (in us) before
                              // a warning is given.
  int64 read_timeout_;        // Maximum time a read can take before a timeout
                              // and the aborting of the read operation.
  int64 write_timeout_;       // Maximum time a write can take before a timeout
                              // and the aborting of the write operation.

  string device_name_;        // Name of device file to access.
  int64 device_sectors_;      // Number of sectors on the device.

  std::queue<BlockData*> in_flight_sectors_;   // Queue of sectors written but
                                                // not verified.
  void *block_buffer_;        // Pointer to aligned block buffer.

#ifdef HAVE_LIBAIO_H
  io_context_t aio_ctx_;     // Asynchronous I/O context for Linux native AIO.
#endif

  DiskBlockTable *block_table_;  // Disk Block Table, shared by all disk
                                 // threads that read / write at the same
                                 // device

  DISALLOW_COPY_AND_ASSIGN(DiskThread);
};

class RandomDiskThread : public DiskThread {
 public:
  explicit RandomDiskThread(DiskBlockTable *block_table);
  virtual ~RandomDiskThread();
  // Main work loop.
  virtual bool DoWork(int fd);
 protected:
  DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
};

// Worker thread to perform checks in a specific memory region.
class MemoryRegionThread : public WorkerThread {
 public:
  MemoryRegionThread();
  ~MemoryRegionThread();
  virtual bool Work();
  void ProcessError(struct ErrorRecord *error, int priority,
                    const char *message);
  bool SetRegion(void *region, int64 size);
  // Calculate worker thread specific bandwidth.
  virtual float GetMemoryCopiedData()
    {return GetCopiedData();}
  virtual float GetDeviceCopiedData()
    {return GetCopiedData() * 2;}
  void SetIdentifier(string identifier) {
    identifier_ = identifier;
  }

 protected:
  // Page queue for this particular memory region.
  char *region_;
  PageEntryQueue *pages_;
  bool error_injection_;
  int phase_;
  string identifier_;
  static const int kPhaseNoPhase = 0;
  static const int kPhaseCopy = 1;
  static const int kPhaseCheck = 2;

 private:
  DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
};

// Worker thread to check that the frequency of every cpu does not go below a
// certain threshold.
class CpuFreqThread : public WorkerThread {
 public:
  CpuFreqThread(int num_cpus, int freq_threshold, int round);
  ~CpuFreqThread();

  // This is the task function that the thread executes.
  virtual bool Work();

  // Returns true if this test can run on the current machine. Otherwise,
  // returns false.
  static bool CanRun();

 private:
  static const int kIntervalPause = 10;   // The number of seconds to pause
                                          // between acquiring the MSR data.
  static const int kStartupDelay = 5;     // The number of seconds to wait
                                          // before acquiring MSR data.
  static const int kMsrTscAddr = 0x10;    // The address of the TSC MSR.
  static const int kMsrAperfAddr = 0xE8;  // The address of the APERF MSR.
  static const int kMsrMperfAddr = 0xE7;  // The address of the MPERF MSR.

  // The index values into the CpuDataType.msr[] array.
  enum MsrValues {
    kMsrTsc = 0,           // MSR index 0 = TSC.
    kMsrAperf = 1,         // MSR index 1 = APERF.
    kMsrMperf = 2,         // MSR index 2 = MPERF.
    kMsrLast,              // Last MSR index.
  };

  typedef struct {
    uint32 msr;         // The address of the MSR.
    const char *name;   // A human readable string for the MSR.
  } CpuRegisterType;

  typedef struct {
    uint64 msrs[kMsrLast];  // The values of the MSRs.
    struct timeval tv;      // The time at which the MSRs were read.
  } CpuDataType;

  // The set of MSR addresses and register names.
  static const CpuRegisterType kCpuRegisters[kMsrLast];

  // Compute the change in values of the MSRs between current and previous,
  // set the frequency in MHz of the cpu. If there is an error computing
  // the delta, return false. Othewise, return true.
  bool ComputeFrequency(CpuDataType *current, CpuDataType *previous,
                        int *frequency);

  // Get the MSR values for this particular cpu and save them in data. If
  // any error is encountered, returns false. Otherwise, returns true.
  bool GetMsrs(int cpu, CpuDataType *data);

  // Compute the difference between the currently read MSR values and the
  // previously read values and store the results in delta. If any of the
  // values did not increase, or the TSC value is too small, returns false.
  // Otherwise, returns true.
  bool ComputeDelta(CpuDataType *current, CpuDataType *previous,
                    CpuDataType *delta);

  // The total number of cpus on the system.
  int num_cpus_;

  // The minimum frequency that each cpu must operate at (in MHz).
  int freq_threshold_;

  // The value to round the computed frequency to.
  int round_;

  // Precomputed value to add to the frequency to do the rounding.
  double round_value_;

  DISALLOW_COPY_AND_ASSIGN(CpuFreqThread);
};

#endif  // STRESSAPPTEST_WORKER_H_