dma_resolver.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. #!/usr/bin/env python
  2. import sys, fnmatch
  3. import importlib
  4. # peripheral types that can be shared, wildcard patterns
  5. SHARED_MAP = ["I2C*", "USART*_TX", "UART*_TX", "SPI*", "TIM*_UP"]
  6. ignore_list = []
  7. dma_map = None
  8. debug = False
  9. def check_possibility(periph, dma_stream, curr_dict, dma_map, check_list, recurse=False):
  10. global ignore_list
  11. for other_periph in curr_dict:
  12. if other_periph != periph:
  13. if curr_dict[other_periph] == dma_stream:
  14. ignore_list.append(periph)
  15. check_str = "%s(%d,%d) %s(%d,%d)" % (
  16. other_periph, curr_dict[other_periph][0],
  17. curr_dict[other_periph][1], periph, dma_stream[0],
  18. dma_stream[1])
  19. #check if we did this before
  20. if check_str in check_list:
  21. return False
  22. check_list.append(check_str)
  23. if debug:
  24. print("Trying to Resolve Conflict: ", check_str)
  25. #check if we can resolve by swapping with other periphs
  26. for streamchan in dma_map[other_periph]:
  27. stream = (streamchan[0], streamchan[1])
  28. if stream != curr_dict[other_periph] and check_possibility(other_periph, stream, curr_dict, dma_map, check_list, False):
  29. if not recurse:
  30. curr_dict[other_periph] = stream
  31. return True
  32. return False
  33. return True
  34. def can_share(periph, noshare_list):
  35. '''check if a peripheral is in the SHARED_MAP list'''
  36. for noshare in noshare_list:
  37. if fnmatch.fnmatch(periph, noshare):
  38. return False
  39. for f in SHARED_MAP:
  40. if fnmatch.fnmatch(periph, f):
  41. return True
  42. if debug:
  43. print("%s can't share" % periph)
  44. return False
  45. # list of peripherals on H7 that are on DMAMUX2 and BDMA
  46. have_DMAMUX = False
  47. DMAMUX2_peripherals = [ 'I2C4', 'SPI6', 'ADC3' ]
  48. def dmamux_channel(key):
  49. '''return DMAMUX channel for H7'''
  50. for p in DMAMUX2_peripherals:
  51. if key.find(p) != -1:
  52. return 'STM32_DMAMUX2_' + key
  53. # default to DMAMUX1
  54. return 'STM32_DMAMUX1_' + key
  55. def dma_name(key):
  56. '''return 'DMA' or 'BDMA' based on peripheral name'''
  57. if not have_DMAMUX:
  58. return "DMA"
  59. for p in DMAMUX2_peripherals:
  60. if key.find(p) != -1:
  61. return 'BDMA'
  62. return 'DMA'
  63. def chibios_dma_define_name(key):
  64. '''return define name needed for board.h for ChibiOS'''
  65. dma_key = key + '_' + dma_name(key)
  66. if key.startswith('ADC'):
  67. return 'STM32_ADC_%s_' % dma_key
  68. elif key.startswith('SPI'):
  69. return 'STM32_SPI_%s_' % dma_key
  70. elif key.startswith('I2C'):
  71. return 'STM32_I2C_%s_' % dma_key
  72. elif key.startswith('USART'):
  73. return 'STM32_UART_%s_' % dma_key
  74. elif key.startswith('UART'):
  75. return 'STM32_UART_%s_' % dma_key
  76. elif key.startswith('SDIO') or key.startswith('SDMMC'):
  77. return 'STM32_SDC_%s_' % dma_key
  78. elif key.startswith('TIM'):
  79. return 'STM32_TIM_%s_' % dma_key
  80. else:
  81. print("Error: Unknown key type %s" % key)
  82. sys.exit(1)
  83. def get_list_index(peripheral, priority_list):
  84. '''return index into priority_list for a peripheral'''
  85. for i in range(len(priority_list)):
  86. str = priority_list[i]
  87. if fnmatch.fnmatch(peripheral, str):
  88. return i
  89. # default to max priority
  90. return len(priority_list)
  91. def get_sharing_priority(periph_list, priority_list):
  92. '''get priority of a list of peripherals we could share with'''
  93. highest = len(priority_list)
  94. for p in periph_list:
  95. prio = get_list_index(p, priority_list)
  96. if prio < highest:
  97. highest = prio
  98. return highest
  99. def generate_DMAMUX_map_mask(peripheral_list, channel_mask, noshare_list, dma_exclude):
  100. '''
  101. generate a dma map suitable for a board with a DMAMUX
  102. In principle any peripheral can use any stream, but we need to
  103. ensure that a peripheral doesn't try to use the same stream as its
  104. partner (eg. a RX/TX pair)
  105. '''
  106. dma_map = {}
  107. idsets = {}
  108. # first unshareable peripherals
  109. available = channel_mask
  110. for p in peripheral_list:
  111. dma_map[p] = []
  112. idsets[p] = set()
  113. for p in peripheral_list:
  114. if can_share(p, noshare_list) or p in dma_exclude:
  115. continue
  116. for i in range(16):
  117. mask = (1<<i)
  118. if available & mask != 0:
  119. available &= ~mask
  120. dma = (i // 8) + 1
  121. stream = i % 8
  122. dma_map[p].append((dma,stream,0))
  123. idsets[p].add(i)
  124. break
  125. if debug:
  126. print('dma_map1: ', dma_map)
  127. print('available: 0x%04x' % available)
  128. # now shareable
  129. idx = 0
  130. for p in peripheral_list:
  131. if not can_share(p, noshare_list) or p in dma_exclude:
  132. continue
  133. base = idx % 16
  134. for i in range(16):
  135. found = None
  136. for ii in list(range(base,16)) + list(range(0,base)):
  137. if (1<<ii) & available == 0:
  138. continue
  139. dma = (ii // 8) + 1
  140. stream = ii % 8
  141. if (dma,stream) in dma_map[p]:
  142. continue
  143. # prevent attempts to share with other half of same peripheral
  144. if p.endswith('RX'):
  145. other = p[:-2] + 'TX'
  146. elif p.endswith('TX'):
  147. other = p[:-2] + 'RX'
  148. else:
  149. other = None
  150. if other is not None and ii in idsets[other]:
  151. if len(idsets[p]) >= len(idsets[other]) and len(idsets[other]) > 0:
  152. continue
  153. idsets[other].remove(ii)
  154. dma_map[other].remove((dma,stream))
  155. found = ii
  156. break
  157. if found is None:
  158. continue
  159. base = (found+1) % 16
  160. dma = (found // 8) + 1
  161. stream = found % 8
  162. dma_map[p].append((dma,stream))
  163. idsets[p].add(found)
  164. idx = (idx+1) % 16
  165. if debug:
  166. print('dma_map: ', dma_map)
  167. print('idsets: ', idsets)
  168. print('available: 0x%04x' % available)
  169. return dma_map
  170. def generate_DMAMUX_map(peripheral_list, noshare_list, dma_exclude):
  171. '''
  172. generate a dma map suitable for a board with a DMAMUX1 and DMAMUX2
  173. '''
  174. # first split peripheral_list into those for DMAMUX1 and those for DMAMUX2
  175. dmamux1_peripherals = []
  176. dmamux2_peripherals = []
  177. for p in peripheral_list:
  178. if dma_name(p) == 'BDMA':
  179. dmamux2_peripherals.append(p)
  180. else:
  181. dmamux1_peripherals.append(p)
  182. map1 = generate_DMAMUX_map_mask(dmamux1_peripherals, 0xFFFF, noshare_list, dma_exclude)
  183. map2 = generate_DMAMUX_map_mask(dmamux2_peripherals, 0xFF, noshare_list, dma_exclude)
  184. # translate entries from map2 to "DMA controller 3", which is used for BDMA
  185. for p in map2.keys():
  186. streams = []
  187. for (controller,stream) in map2[p]:
  188. streams.append((3,stream))
  189. map2[p] = streams
  190. both = map1
  191. both.update(map2)
  192. if debug:
  193. print('dma_map_both: ', both)
  194. return both
  195. def write_dma_header(f, peripheral_list, mcu_type, dma_exclude=[],
  196. dma_priority='', dma_noshare=''):
  197. '''write out a DMA resolver header file'''
  198. global dma_map, have_DMAMUX
  199. # form a list of DMA priorities
  200. priority_list = dma_priority.split()
  201. # sort by priority
  202. peripheral_list = sorted(peripheral_list, key=lambda x: get_list_index(x, priority_list))
  203. # form a list of peripherals that can't share
  204. noshare_list = dma_noshare.split()
  205. try:
  206. lib = importlib.import_module(mcu_type)
  207. if hasattr(lib, "DMA_Map"):
  208. dma_map = lib.DMA_Map
  209. else:
  210. return
  211. except ImportError:
  212. print("Unable to find module for MCU %s" % mcu_type)
  213. sys.exit(1)
  214. if dma_map is None:
  215. have_DMAMUX = True
  216. dma_map = generate_DMAMUX_map(peripheral_list, noshare_list, dma_exclude)
  217. print("Writing DMA map")
  218. unassigned = []
  219. curr_dict = {}
  220. for periph in peripheral_list:
  221. if periph in dma_exclude:
  222. continue
  223. assigned = False
  224. check_list = []
  225. if not periph in dma_map:
  226. print("Unknown peripheral function %s in DMA map for %s" %
  227. (periph, mcu_type))
  228. sys.exit(1)
  229. for streamchan in dma_map[periph]:
  230. stream = (streamchan[0], streamchan[1])
  231. if check_possibility(periph, stream, curr_dict, dma_map,
  232. check_list):
  233. curr_dict[periph] = stream
  234. assigned = True
  235. break
  236. if assigned == False:
  237. unassigned.append(periph)
  238. if debug:
  239. print('curr_dict: ', curr_dict)
  240. print('unassigned: ', unassigned)
  241. # now look for shared DMA possibilities
  242. stream_assign = {}
  243. for k in curr_dict.keys():
  244. p = curr_dict[k]
  245. if not p in stream_assign:
  246. stream_assign[p] = [k]
  247. else:
  248. stream_assign[p].append(k)
  249. unassigned_new = unassigned[:]
  250. for periph in unassigned:
  251. share_possibility = []
  252. for streamchan in dma_map[periph]:
  253. stream = (streamchan[0], streamchan[1])
  254. share_ok = True
  255. for periph2 in stream_assign[stream]:
  256. if not can_share(periph, noshare_list) or not can_share(periph2, noshare_list):
  257. share_ok = False
  258. if share_ok:
  259. share_possibility.append(stream)
  260. if share_possibility:
  261. # sort the possible sharings so minimise impact on high priority streams
  262. share_possibility = sorted(share_possibility, key=lambda x: get_sharing_priority(stream_assign[x], priority_list))
  263. # and take the one with the least impact (lowest value for highest priority stream share)
  264. stream = share_possibility[-1]
  265. if debug:
  266. print("Sharing %s on %s with %s" % (periph, stream,
  267. stream_assign[stream]))
  268. curr_dict[periph] = stream
  269. stream_assign[stream].append(periph)
  270. unassigned_new.remove(periph)
  271. unassigned = unassigned_new
  272. if debug:
  273. print(stream_assign)
  274. f.write("\n\n// auto-generated DMA mapping from dma_resolver.py\n")
  275. if unassigned:
  276. f.write(
  277. "\n// Note: The following peripherals can't be resolved for DMA: %s\n\n"
  278. % unassigned)
  279. for key in sorted(curr_dict.keys()):
  280. stream = curr_dict[key]
  281. shared = ''
  282. if len(stream_assign[stream]) > 1:
  283. shared = ' // shared %s' % ','.join(stream_assign[stream])
  284. if curr_dict[key] == "STM32_DMA_STREAM_ID_ANY":
  285. f.write("#define %-30s STM32_DMA_STREAM_ID_ANY\n" % (chibios_dma_define_name(key)+'STREAM'))
  286. f.write("#define %-30s %s\n" % (chibios_dma_define_name(key)+'CHAN', dmamux_channel(key)))
  287. continue
  288. else:
  289. dma_controller = curr_dict[key][0]
  290. if dma_controller == 3:
  291. # for BDMA we use 3 in the resolver
  292. dma_controller = 1
  293. f.write("#define %-30s STM32_DMA_STREAM_ID(%u, %u)%s\n" %
  294. (chibios_dma_define_name(key)+'STREAM', dma_controller,
  295. curr_dict[key][1], shared))
  296. for streamchan in dma_map[key]:
  297. if stream == (streamchan[0], streamchan[1]):
  298. if have_DMAMUX:
  299. chan = dmamux_channel(key)
  300. else:
  301. chan = streamchan[2]
  302. f.write("#define %-30s %s\n" %
  303. (chibios_dma_define_name(key)+'CHAN', chan))
  304. break
  305. # now generate UARTDriver.cpp DMA config lines
  306. f.write("\n\n// generated UART DMA configuration lines\n")
  307. for u in range(1, 9):
  308. key = None
  309. if 'USART%u_TX' % u in peripheral_list:
  310. key = 'USART%u' % u
  311. if 'UART%u_TX' % u in peripheral_list:
  312. key = 'UART%u' % u
  313. if 'USART%u_RX' % u in peripheral_list:
  314. key = 'USART%u' % u
  315. if 'UART%u_RX' % u in peripheral_list:
  316. key = 'UART%u' % u
  317. if key is None:
  318. continue
  319. if have_DMAMUX:
  320. # use DMAMUX ID as channel number
  321. dma_rx_chn = dmamux_channel(key + "_RX")
  322. dma_tx_chn = dmamux_channel(key + "_TX")
  323. else:
  324. dma_rx_chn = "STM32_UART_%s_RX_%s_CHAN" % (key, dma_name(key))
  325. dma_tx_chn = "STM32_UART_%s_TX_%s_CHAN" % (key, dma_name(key))
  326. f.write("#define STM32_%s_RX_DMA_CONFIG " % key)
  327. if key + "_RX" in curr_dict:
  328. f.write(
  329. "true, STM32_UART_%s_RX_%s_STREAM, %s\n" % (key, dma_name(key), dma_rx_chn))
  330. else:
  331. f.write("false, 0, 0\n")
  332. f.write("#define STM32_%s_TX_DMA_CONFIG " % key)
  333. if key + "_TX" in curr_dict:
  334. f.write(
  335. "true, STM32_UART_%s_TX_%s_STREAM, %s\n" % (key, dma_name(key), dma_tx_chn))
  336. else:
  337. f.write("false, 0, 0\n")
  338. # now generate SPI DMA streams lines
  339. f.write("\n\n// generated SPI DMA configuration lines\n")
  340. for u in range(1, 9):
  341. if 'SPI%u_TX' % u in peripheral_list and 'SPI%u_RX' % u in peripheral_list:
  342. key = 'SPI%u' % u
  343. else:
  344. continue
  345. f.write('#define STM32_SPI_%s_DMA_STREAMS STM32_SPI_%s_TX_%s_STREAM, STM32_SPI_%s_RX_%s_STREAM\n' % (
  346. key, key, dma_name(key), key, dma_name(key)))
  347. if __name__ == '__main__':
  348. import optparse
  349. parser = optparse.OptionParser("dma_resolver.py")
  350. parser.add_option("-M", "--mcu", default=None, help='MCU type')
  351. parser.add_option(
  352. "-D", "--debug", action='store_true', help='enable debug')
  353. parser.add_option(
  354. "-P",
  355. "--peripherals",
  356. default=None,
  357. help='peripheral list (comma separated)')
  358. opts, args = parser.parse_args()
  359. if opts.peripherals is None:
  360. print("Please provide a peripheral list with -P")
  361. sys.exit(1)
  362. if opts.mcu is None:
  363. print("Please provide a MCU type with -<")
  364. sys.exit(1)
  365. debug = opts.debug
  366. plist = opts.peripherals.split(',')
  367. mcu_type = opts.mcu
  368. f = open("dma.h", "w")
  369. write_dma_header(f, plist, mcu_type)