Tag Archives: concurrency

Nonintrusive concurrency with Clojure

最近对Clojure比较感兴趣,写了一个quick sort做测试。

单线程版:

(defn qsort-seq [seq]
  (if (<= (.length seq) 25)
    (selection-sort seq)
    (let [[first-half second-half] (qsort-partition seq 0)
          sorted-first-half (qsort-seq first-half)
          sorted-second-half (qsort-seq second-half)]
          (concat sorted-first-half sorted-second-half))))

多线程版:

(defn qsort [seq]
  (if (<= (.length seq) 25)
    (selection-sort seq)
    (let [[first-half second-half] (qsort-partition seq 0)
      sorted-first-half (future (qsort first-half))
      sorted-second-half (qsort second-half)]
      (concat @sorted-first-half sorted-second-half))))

可以看到两个版本基本是一样的,所以在Clojure里通常可以很快把单线程的程序并行化。下面是一些调用到的函数:

(def random (new java.util.Random))

(defn gen_rand_ints [n]
  (loop [result [], i n]
    (if (zero? i)
      result
      (recur (conj result (.nextInt random)) (dec i)))))

(def values (vec (gen_rand_ints 10000)))

(defn swap [vec i j]
  (let [iv (vec i),
    jv (vec j)]
    (assoc vec i jv j iv)))

(defn selection-sort [seq]
  (defn index_of_smaller [seq i j]
    (if (< (seq i) (seq j)) i j))
  (loop [seq- seq start 0]
    (if (< start (.length seq))
      (let [index_of_smallest_v
            (reduce (partial index_of_smaller seq-)
                    (range start (.length seq-)))
            new_seq (swap seq- start index_of_smallest_v)]
        (recur new_seq (inc start)))
      seq-)))

(defn qsort-partition [seq pivot_index]
  (let [pivot (seq pivot_index)
    pair (loop [i 0, le [], gt []]
           (if (< i (.length seq))
         (if (<= (seq i) pivot)
           (let [le (cons (seq i) le)]
             (recur (inc i) le gt))
           (let [gt (cons (seq i) gt)]
             (recur (inc i) le gt)))
         [(vec le), (vec gt)]))]
    pair))

比较一下两个版本排列10000个数的时间:

(time (qsort-seq values))
(time (qsort values))
(shutdown-agents)

输出

"Elapsed time: 1532.536441 msecs"
"Elapsed time: 1121.529135 msecs"