netcache_client.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. #! /usr/bin/env python
  2. # encoding: utf-8
  3. # Thomas Nagy, 2011-2015 (ita)
  4. """
  5. A client for the network cache (playground/netcache/). Launch the server with:
  6. ./netcache_server, then use it for the builds by adding the following:
  7. def build(bld):
  8. bld.load('netcache_client')
  9. The parameters should be present in the environment in the form:
  10. NETCACHE=host:port waf configure build
  11. Or in a more detailed way:
  12. NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
  13. where:
  14. host: host where the server resides, by default localhost
  15. port: by default push on 11001 and pull on 12001
  16. Use the server provided in playground/netcache/Netcache.java
  17. """
  18. import os, socket, time, atexit, sys
  19. from waflib import Task, Logs, Utils, Build, Runner
  20. from waflib.Configure import conf
  21. BUF = 8192 * 16
  22. HEADER_SIZE = 128
  23. MODES = ['PUSH', 'PULL', 'PUSH_PULL']
  24. STALE_TIME = 30 # seconds
  25. GET = 'GET'
  26. PUT = 'PUT'
  27. LST = 'LST'
  28. BYE = 'BYE'
  29. all_sigs_in_cache = (0.0, [])
  30. def put_data(conn, data):
  31. if sys.hexversion > 0x3000000:
  32. data = data.encode('latin-1')
  33. cnt = 0
  34. while cnt < len(data):
  35. sent = conn.send(data[cnt:])
  36. if sent == 0:
  37. raise RuntimeError('connection ended')
  38. cnt += sent
  39. push_connections = Runner.Queue(0)
  40. pull_connections = Runner.Queue(0)
  41. def get_connection(push=False):
  42. # return a new connection... do not forget to release it!
  43. try:
  44. if push:
  45. ret = push_connections.get(block=False)
  46. else:
  47. ret = pull_connections.get(block=False)
  48. except Exception:
  49. ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  50. if push:
  51. ret.connect(Task.push_addr)
  52. else:
  53. ret.connect(Task.pull_addr)
  54. return ret
  55. def release_connection(conn, msg='', push=False):
  56. if conn:
  57. if push:
  58. push_connections.put(conn)
  59. else:
  60. pull_connections.put(conn)
  61. def close_connection(conn, msg=''):
  62. if conn:
  63. data = '%s,%s' % (BYE, msg)
  64. try:
  65. put_data(conn, data.ljust(HEADER_SIZE))
  66. except:
  67. pass
  68. try:
  69. conn.close()
  70. except:
  71. pass
  72. def close_all():
  73. for q in (push_connections, pull_connections):
  74. while q.qsize():
  75. conn = q.get()
  76. try:
  77. close_connection(conn)
  78. except:
  79. # ignore errors when cleaning up
  80. pass
  81. atexit.register(close_all)
  82. def read_header(conn):
  83. cnt = 0
  84. buf = []
  85. while cnt < HEADER_SIZE:
  86. data = conn.recv(HEADER_SIZE - cnt)
  87. if not data:
  88. #import traceback
  89. #traceback.print_stack()
  90. raise ValueError('connection ended when reading a header %r' % buf)
  91. buf.append(data)
  92. cnt += len(data)
  93. if sys.hexversion > 0x3000000:
  94. ret = ''.encode('latin-1').join(buf)
  95. ret = ret.decode('latin-1')
  96. else:
  97. ret = ''.join(buf)
  98. return ret
  99. def check_cache(conn, ssig):
  100. """
  101. List the files on the server, this is an optimization because it assumes that
  102. concurrent builds are rare
  103. """
  104. global all_sigs_in_cache
  105. if not STALE_TIME:
  106. return
  107. if time.time() - all_sigs_in_cache[0] > STALE_TIME:
  108. params = (LST,'')
  109. put_data(conn, ','.join(params).ljust(HEADER_SIZE))
  110. # read what is coming back
  111. ret = read_header(conn)
  112. size = int(ret.split(',')[0])
  113. buf = []
  114. cnt = 0
  115. while cnt < size:
  116. data = conn.recv(min(BUF, size-cnt))
  117. if not data:
  118. raise ValueError('connection ended %r %r' % (cnt, size))
  119. buf.append(data)
  120. cnt += len(data)
  121. if sys.hexversion > 0x3000000:
  122. ret = ''.encode('latin-1').join(buf)
  123. ret = ret.decode('latin-1')
  124. else:
  125. ret = ''.join(buf)
  126. all_sigs_in_cache = (time.time(), ret.splitlines())
  127. Logs.debug('netcache: server cache has %r entries', len(all_sigs_in_cache[1]))
  128. if not ssig in all_sigs_in_cache[1]:
  129. raise ValueError('no file %s in cache' % ssig)
  130. class MissingFile(Exception):
  131. pass
  132. def recv_file(conn, ssig, count, p):
  133. check_cache(conn, ssig)
  134. params = (GET, ssig, str(count))
  135. put_data(conn, ','.join(params).ljust(HEADER_SIZE))
  136. data = read_header(conn)
  137. size = int(data.split(',')[0])
  138. if size == -1:
  139. raise MissingFile('no file %s - %s in cache' % (ssig, count))
  140. # get the file, writing immediately
  141. # TODO a tmp file would be better
  142. f = open(p, 'wb')
  143. cnt = 0
  144. while cnt < size:
  145. data = conn.recv(min(BUF, size-cnt))
  146. if not data:
  147. raise ValueError('connection ended %r %r' % (cnt, size))
  148. f.write(data)
  149. cnt += len(data)
  150. f.close()
  151. def sock_send(conn, ssig, cnt, p):
  152. #print "pushing %r %r %r" % (ssig, cnt, p)
  153. size = os.stat(p).st_size
  154. params = (PUT, ssig, str(cnt), str(size))
  155. put_data(conn, ','.join(params).ljust(HEADER_SIZE))
  156. f = open(p, 'rb')
  157. cnt = 0
  158. while cnt < size:
  159. r = f.read(min(BUF, size-cnt))
  160. while r:
  161. k = conn.send(r)
  162. if not k:
  163. raise ValueError('connection ended')
  164. cnt += k
  165. r = r[k:]
  166. def can_retrieve_cache(self):
  167. if not Task.pull_addr:
  168. return False
  169. if not self.outputs:
  170. return False
  171. self.cached = False
  172. cnt = 0
  173. sig = self.signature()
  174. ssig = Utils.to_hex(self.uid() + sig)
  175. conn = None
  176. err = False
  177. try:
  178. try:
  179. conn = get_connection()
  180. for node in self.outputs:
  181. p = node.abspath()
  182. recv_file(conn, ssig, cnt, p)
  183. cnt += 1
  184. except MissingFile as e:
  185. Logs.debug('netcache: file is not in the cache %r', e)
  186. err = True
  187. except Exception as e:
  188. Logs.debug('netcache: could not get the files %r', self.outputs)
  189. if Logs.verbose > 1:
  190. Logs.debug('netcache: exception %r', e)
  191. err = True
  192. # broken connection? remove this one
  193. close_connection(conn)
  194. conn = None
  195. else:
  196. Logs.debug('netcache: obtained %r from cache', self.outputs)
  197. finally:
  198. release_connection(conn)
  199. if err:
  200. return False
  201. self.cached = True
  202. return True
  203. @Utils.run_once
  204. def put_files_cache(self):
  205. if not Task.push_addr:
  206. return
  207. if not self.outputs:
  208. return
  209. if getattr(self, 'cached', None):
  210. return
  211. #print "called put_files_cache", id(self)
  212. bld = self.generator.bld
  213. sig = self.signature()
  214. ssig = Utils.to_hex(self.uid() + sig)
  215. conn = None
  216. cnt = 0
  217. try:
  218. for node in self.outputs:
  219. # We could re-create the signature of the task with the signature of the outputs
  220. # in practice, this means hashing the output files
  221. # this is unnecessary
  222. try:
  223. if not conn:
  224. conn = get_connection(push=True)
  225. sock_send(conn, ssig, cnt, node.abspath())
  226. Logs.debug('netcache: sent %r', node)
  227. except Exception as e:
  228. Logs.debug('netcache: could not push the files %r', e)
  229. # broken connection? remove this one
  230. close_connection(conn)
  231. conn = None
  232. cnt += 1
  233. finally:
  234. release_connection(conn, push=True)
  235. bld.task_sigs[self.uid()] = self.cache_sig
  236. def hash_env_vars(self, env, vars_lst):
  237. # reimplement so that the resulting hash does not depend on local paths
  238. if not env.table:
  239. env = env.parent
  240. if not env:
  241. return Utils.SIG_NIL
  242. idx = str(id(env)) + str(vars_lst)
  243. try:
  244. cache = self.cache_env
  245. except AttributeError:
  246. cache = self.cache_env = {}
  247. else:
  248. try:
  249. return self.cache_env[idx]
  250. except KeyError:
  251. pass
  252. v = str([env[a] for a in vars_lst])
  253. v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
  254. m = Utils.md5()
  255. m.update(v.encode())
  256. ret = m.digest()
  257. Logs.debug('envhash: %r %r', ret, v)
  258. cache[idx] = ret
  259. return ret
  260. def uid(self):
  261. # reimplement so that the signature does not depend on local paths
  262. try:
  263. return self.uid_
  264. except AttributeError:
  265. m = Utils.md5()
  266. src = self.generator.bld.srcnode
  267. up = m.update
  268. up(self.__class__.__name__.encode())
  269. for x in self.inputs + self.outputs:
  270. up(x.path_from(src).encode())
  271. self.uid_ = m.digest()
  272. return self.uid_
  273. def make_cached(cls):
  274. if getattr(cls, 'nocache', None):
  275. return
  276. m1 = cls.run
  277. def run(self):
  278. if getattr(self, 'nocache', False):
  279. return m1(self)
  280. if self.can_retrieve_cache():
  281. return 0
  282. return m1(self)
  283. cls.run = run
  284. m2 = cls.post_run
  285. def post_run(self):
  286. if getattr(self, 'nocache', False):
  287. return m2(self)
  288. bld = self.generator.bld
  289. ret = m2(self)
  290. if bld.cache_global:
  291. self.put_files_cache()
  292. if hasattr(self, 'chmod'):
  293. for node in self.outputs:
  294. os.chmod(node.abspath(), self.chmod)
  295. return ret
  296. cls.post_run = post_run
  297. @conf
  298. def setup_netcache(ctx, push_addr, pull_addr):
  299. Task.Task.can_retrieve_cache = can_retrieve_cache
  300. Task.Task.put_files_cache = put_files_cache
  301. Task.Task.uid = uid
  302. Task.push_addr = push_addr
  303. Task.pull_addr = pull_addr
  304. Build.BuildContext.hash_env_vars = hash_env_vars
  305. ctx.cache_global = True
  306. for x in Task.classes.values():
  307. make_cached(x)
  308. def build(bld):
  309. if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
  310. Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
  311. os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
  312. os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
  313. if 'NETCACHE' in os.environ:
  314. if not 'NETCACHE_PUSH' in os.environ:
  315. os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
  316. if not 'NETCACHE_PULL' in os.environ:
  317. os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
  318. v = os.environ['NETCACHE_PULL']
  319. if v:
  320. h, p = v.split(':')
  321. pull_addr = (h, int(p))
  322. else:
  323. pull_addr = None
  324. v = os.environ['NETCACHE_PUSH']
  325. if v:
  326. h, p = v.split(':')
  327. push_addr = (h, int(p))
  328. else:
  329. push_addr = None
  330. setup_netcache(bld, push_addr, pull_addr)