aboutsummaryrefslogtreecommitdiff
path: root/llvm/include/llvm/Support/ThreadPool.h
blob: aecff122d3cbe52796716797f1d0168f83d8ec60 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// This file defines a crude C++11 based thread pool.
//
//===----------------------------------------------------------------------===//

#ifndef LLVM_SUPPORT_THREADPOOL_H
#define LLVM_SUPPORT_THREADPOOL_H

#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Threading.h"
#include "llvm/Support/thread.h"

#include <future>

#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <utility>

namespace llvm {

/// A ThreadPool for asynchronous parallel execution on a defined number of
/// threads.
///
/// The pool keeps a vector of threads alive, waiting on a condition variable
/// for some work to become available.
class ThreadPool {
public:
  /// Construct a pool using the hardware strategy \p S for mapping hardware
  /// execution resources (threads, cores, CPUs)
  /// Defaults to using the maximum execution resources in the system, but
  /// accounting for the affinity mask.
  ThreadPool(ThreadPoolStrategy S = hardware_concurrency());

  /// Blocking destructor: the pool will wait for all the threads to complete.
  ~ThreadPool();

  /// Asynchronous submission of a task to the pool. The returned future can be
  /// used to wait for the task to finish and is *non-blocking* on destruction.
  template <typename Function, typename... Args>
  inline auto async(Function &&F, Args &&...ArgList) {
    auto Task =
        std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
    return async(std::move(Task));
  }

  /// Asynchronous submission of a task to the pool. The returned future can be
  /// used to wait for the task to finish and is *non-blocking* on destruction.
  template <typename Func>
  auto async(Func &&F) -> std::shared_future<decltype(F())> {
    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
  }

  /// Blocking wait for all the threads to complete and the queue to be empty.
  /// It is an error to try to add new tasks while blocking on this call.
  void wait();

  // TODO: misleading legacy name warning!
  // Returns the maximum number of worker threads in the pool, not the current
  // number of threads!
  unsigned getThreadCount() const { return MaxThreadCount; }

  /// Returns true if the current thread is a worker thread of this thread pool.
  bool isWorkerThread() const;

private:
  /// Helpers to create a promise and a callable wrapper of \p Task that sets
  /// the result of the promise. Returns the callable and a future to access the
  /// result.
  template <typename ResTy>
  static std::pair<std::function<void()>, std::future<ResTy>>
  createTaskAndFuture(std::function<ResTy()> Task) {
    std::shared_ptr<std::promise<ResTy>> Promise =
        std::make_shared<std::promise<ResTy>>();
    auto F = Promise->get_future();
    return {
        [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
        std::move(F)};
  }
  static std::pair<std::function<void()>, std::future<void>>
  createTaskAndFuture(std::function<void()> Task) {
    std::shared_ptr<std::promise<void>> Promise =
        std::make_shared<std::promise<void>>();
    auto F = Promise->get_future();
    return {[Promise = std::move(Promise), Task]() {
              Task();
              Promise->set_value();
            },
            std::move(F)};
  }

  bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }

  /// Asynchronous submission of a task to the pool. The returned future can be
  /// used to wait for the task to finish and is *non-blocking* on destruction.
  template <typename ResTy>
  std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {

#if LLVM_ENABLE_THREADS
    /// Wrap the Task in a std::function<void()> that sets the result of the
    /// corresponding future.
    auto R = createTaskAndFuture(Task);

    int requestedThreads;
    {
      // Lock the queue and push the new task
      std::unique_lock<std::mutex> LockGuard(QueueLock);

      // Don't allow enqueueing after disabling the pool
      assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
      Tasks.push(std::move(R.first));
      requestedThreads = ActiveThreads + Tasks.size();
    }
    QueueCondition.notify_one();
    grow(requestedThreads);
    return R.second.share();

#else // LLVM_ENABLE_THREADS Disabled

    // Get a Future with launch::deferred execution using std::async
    auto Future = std::async(std::launch::deferred, std::move(Task)).share();
    // Wrap the future so that both ThreadPool::wait() can operate and the
    // returned future can be sync'ed on.
    Tasks.push([Future]() { Future.get(); });
    return Future;
#endif
  }

#if LLVM_ENABLE_THREADS
  // Grow to ensure that we have at least `requested` Threads, but do not go
  // over MaxThreadCount.
  void grow(int requested);
#endif

  /// Threads in flight
  std::vector<llvm::thread> Threads;
  /// Lock protecting access to the Threads vector.
  mutable std::mutex ThreadsLock;

  /// Tasks waiting for execution in the pool.
  std::queue<std::function<void()>> Tasks;

  /// Locking and signaling for accessing the Tasks queue.
  std::mutex QueueLock;
  std::condition_variable QueueCondition;

  /// Signaling for job completion
  std::condition_variable CompletionCondition;

  /// Keep track of the number of thread actually busy
  unsigned ActiveThreads = 0;

#if LLVM_ENABLE_THREADS // avoids warning for unused variable
  /// Signal for the destruction of the pool, asking thread to exit.
  bool EnableFlag = true;
#endif

  const ThreadPoolStrategy Strategy;

  /// Maximum number of threads to potentially grow this pool to.
  const unsigned MaxThreadCount;
};
}

#endif // LLVM_SUPPORT_THREADPOOL_H