aboutsummaryrefslogtreecommitdiff
path: root/projects/ffmpeg/group_seed_corpus.py
blob: 1e1d51cd6184fd376fb36d7757f89ef016906c61 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################

from __future__ import print_function
import logging
import os
import re
import sys
import zipfile


logging.basicConfig(level=logging.INFO, format='INFO: %(message)s')
CODEC_NAME_REGEXP = re.compile(r'codec_id_(.+?)_fuzzer')


def get_fuzzer_tags(fuzzer_name):
  """Extract tags (are used to filter samples) from the given fuzzer name."""
  tags = []
  fuzzer_name = fuzzer_name.lower()
  # All subtitle samples are in 'sub' directory, need to add 'sub' tag manually.
  if 'subtitle' in fuzzer_name:
    tags.append('sub')
  m = CODEC_NAME_REGEXP.search(fuzzer_name)
  if m:
    codec_name = m.group(1)
    # Some names are complex, need to split them and filter common strings.
    codec_name_parts = codec_name.split('_')
    for codec in codec_name_parts:
      # Remove common strings from codec names like 'mpeg1video' or 'msvideo1'.
      codec = codec.split('video')[0]
      codec = codec.split('audio')[0]
      codec = codec.split('subtitle')[0]
      codec = codec.split('text')[0]
      if codec:
        # Some codec names have trailing characters: 'VP6F','FLV1', 'JPEGLS'.
        # Use only first 3 characters for long enough codec names.
        if len(codec) > 3:
          tags.append(codec[:3])
        else:
          tags.append(codec)

  return tags


def parse_corpus(corpus_directory):
  """Recursively list all files in the given directory and ignore checksums."""
  all_corpus_files = []
  for root, dirs, files in os.walk(corpus_directory):
    for filename in files:
      # Skip checksum files, they are useless in corpus.
      if 'md5sum' in filename:
        continue
      path = os.path.join(root, filename)
      all_corpus_files.append(path)

  logging.info('Parsed %d corpus files from %s' % (len(all_corpus_files),
                                                   corpus_directory))
  return all_corpus_files


def parse_fuzzers(fuzzers_directory):
  """Recursively list all fuzzers in the given directory."""
  all_fuzzers = []
  for filename in os.listdir(fuzzers_directory):
    # Skip non-ffmpeg and non-fuzzer files in the given directory,
    if not filename.startswith('ffmpeg_') or not filename.endswith('_fuzzer'):
      continue
    fuzzer_path = os.path.join(fuzzers_directory, filename)
    all_fuzzers.append(fuzzer_path)

  logging.info('Parsed %d fuzzers from %s' % (len(all_fuzzers),
                                              fuzzers_directory))
  return all_fuzzers
 

def zip_relevant_corpus(corpus_files, fuzzers):
  """Find relevant corpus files and archive them for every fuzzer given."""
  for fuzzer in fuzzers:
    fuzzer_name = os.path.basename(fuzzer)
    fuzzer_directory = os.path.dirname(fuzzer)
    fuzzer_tags = get_fuzzer_tags(fuzzer_name)
    relevant_corpus_files = set()
    for filename in corpus_files:
      # Remove 'ffmpeg' substring to do not use everything for 'MPEG' codec.
      sanitized_filename = filename.replace('ffmpeg', '').lower()
      for tag in fuzzer_tags:
        if tag in sanitized_filename:
          relevant_corpus_files.add(filename)

      if not relevant_corpus_files:
        # Strip last symbol from tags if we haven't found relevant corpus.
        # It helps for such codecs as 'RV40' ('RV4' -> 'RV') or 'PCX' (-> 'PC').
        for tag in fuzzer_tags:
          if tag[:-1] in sanitized_filename:
            relevant_corpus_files.add(filename)

    logging.info(
        'Found %d relevant samples for %s' % (len(relevant_corpus_files),
                                              fuzzer_name))

    if not relevant_corpus_files:
      continue

    zip_archive_name = fuzzer + "_seed_corpus.zip"
    with zipfile.ZipFile(zip_archive_name, 'w') as archive:
      for filename in relevant_corpus_files:
        archive.write(filename)


def main():
  if len(sys.argv) < 3:
    print('Usage: %s <seed_corpus_directory> <fuzzers_directory>' % __file__)
    sys.exit(1)

  seed_corpus_directory = sys.argv[1]
  fuzzers_directory = sys.argv[2]

  corpus_files = parse_corpus(seed_corpus_directory)
  fuzzers = parse_fuzzers(fuzzers_directory)
  zip_relevant_corpus(corpus_files, fuzzers)


if __name__ == '__main__':
  sys.exit(main())