aboutsummaryrefslogtreecommitdiff
path: root/infra/base-images/base-builder/detect_repo.py
blob: e677e102329f2c7b3efb49eb77df19f83b46ce9f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module to get the the name of a git repo containing a specific commit
inside of an OSS-Fuzz project.

Example Usage:

  python detect_repo.py --src_dir /src --example_commit
    b534f03eecd8a109db2b085ab24d419b6486de97

Prints the location of the git remote repo as well as the repo's name
seperated by a space.

  https://github.com/VirusTotal/yara.git yara

"""
import argparse
import logging
import os
import subprocess

GO_PATH = '/root/go/src/'


def main():
  """Function to get a git repo's url and name referenced by OSS-Fuzz
  Dockerfile.

  Raises:
    ValueError when a commit or a ref is not provided.
  """
  parser = argparse.ArgumentParser(
      description=
      'Finds a specific git repo in an oss-fuzz project\'s docker file.')
  parser.add_argument('--repo_name', help='The name of the git repo.')
  parser.add_argument('--src_dir', help='The location of the possible repo.')
  parser.add_argument('--example_commit',
                      help='A commit SHA referencing the project\'s main repo.')

  args = parser.parse_args()
  if not args.repo_name and not args.example_commit:
    raise ValueError(
        'Requires an example commit or a repo name to find repo location.')
  if args.src_dir:
    src_dir = args.src_dir
  else:
    src_dir = os.environ.get('SRC', '/src')

  for single_dir in get_dirs_to_search(src_dir, args.repo_name):
    full_path = os.path.join(src_dir, single_dir)
    if not os.path.isdir(full_path):
      continue
    if args.example_commit and check_for_commit(full_path, args.example_commit):
      print('Detected repo:', get_repo(full_path), full_path)
      return
    if args.repo_name and check_for_repo_name(full_path, args.repo_name):
      print('Detected repo:', get_repo(full_path), full_path)
      return
  logging.error('No git repos with specific commit: %s found in %s',
                args.example_commit, src_dir)


def get_dirs_to_search(src_dir, repo_name):
  """Gets a list of directories to search for the main git repo.

  Args:
    src_dir: The location set for the projects SRC.
    repo_name: The name of the repo you are searching for.

  Returns:
    A list of directorys to search.
  """
  dirs_to_search = os.listdir(src_dir)
  if os.path.exists(GO_PATH) and repo_name:
    for root, dirs, _ in os.walk(GO_PATH):
      for test_dir in dirs:
        if repo_name in test_dir:
          dirs_to_search.append(os.path.join(root, test_dir))
  return dirs_to_search


def get_repo(repo_path):
  """Gets a git repo link from a specific directory in a docker image.

  Args:
    repo_path: The directory on the image where the git repo exists.

  Returns:
    The repo location or None.
  """
  output, return_code = execute(['git', 'config', '--get', 'remote.origin.url'],
                                location=repo_path,
                                check_result=True)
  if return_code == 0 and output:
    return output.rstrip()
  return None


def check_for_repo_name(repo_path, expected_repo_name):
  """Returns True if the repo at |repo_path| repo_name matches
  |expected_repo_name|.

  Args:
    repo_path: The directory of a git repo.
    expected_repo_name: The name of the target git repo.
  """
  if not os.path.exists(os.path.join(repo_path, '.git')):
    return False

  repo_url, _ = execute(['git', 'config', '--get', 'remote.origin.url'],
                        location=repo_path)
  # Handle two common cases:
  # https://github.com/google/syzkaller/
  # https://github.com/google/syzkaller.git
  repo_url = repo_url.replace('.git', '').rstrip().rstrip('/')
  actual_repo_name = repo_url.split('/')[-1]
  return actual_repo_name == expected_repo_name


def check_for_commit(repo_path, commit):
  """Checks a directory for a specific commit.

  Args:
    repo_path: The name of the directory to test for the commit.
    commit: The commit SHA to check for.

  Returns:
    True if directory contains that commit.
  """

  # Check if valid git repo.
  if not os.path.exists(os.path.join(repo_path, '.git')):
    return False

  # Check if history fetch is needed.
  if os.path.exists(os.path.join(repo_path, '.git', 'shallow')):
    execute(['git', 'fetch', '--unshallow'], location=repo_path)

  # Check if commit is in history.
  _, return_code = execute(['git', 'cat-file', '-e', commit],
                           location=repo_path)
  return return_code == 0


def execute(command, location, check_result=False):
  """Runs a shell command in the specified directory location.

  Args:
    command: The command as a list to be run.
    location: The directory the command is run in.
    check_result: Should an exception be thrown on failed command.

  Returns:
    The stdout of the command, the error code.

  Raises:
    RuntimeError: running a command resulted in an error.
  """
  process = subprocess.Popen(command, stdout=subprocess.PIPE, cwd=location)
  output, err = process.communicate()
  if check_result and (process.returncode or err):
    raise RuntimeError(
        'Error: %s\n running command: %s\n return code: %s\n out %s\n' %
        (err, command, process.returncode, output))
  if output is not None:
    output = output.decode('ascii')
  return output, process.returncode


if __name__ == '__main__':
  main()